Gcp Observability Logging: Batching and Graceful close (#32436)

This PR adds batching support for GCP Observability logging. So instead
of the naive creating a new RPC to cloud logging for each logging event,
we now batch the log events to meet one of the following requirements -
* Batch size of 1000
* Batch memory consumption of 1MB
* A timeout period of 1sec after which we flush the accumulated batch
irrespective of the size.

There can also be cases where for some reason the RPCs fail or the batch
just accumulates to a very large size(100000 entries or 10MB in size).
In such cases, we just log the events with gpr_log instead of just
continuing to accumulate.

Additionally, `GcpObservabilityClose()` has been added to gracefully
shut off logging where we block till all the currently logged events are
flushed. (We might be able to gracefully shut off stats and tracing in
the future too.)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32460/head^2
Yash Tibrewal 2 years ago committed by GitHub
parent 14083ed106
commit bf23bb2fa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      include/grpcpp/ext/gcp_observability.h
  2. 1
      src/cpp/ext/filters/logging/BUILD
  3. 2
      src/cpp/ext/filters/logging/logging_filter.cc
  4. 1
      src/cpp/ext/filters/logging/logging_sink.h
  5. 5
      src/cpp/ext/gcp/BUILD
  6. 17
      src/cpp/ext/gcp/observability.cc
  7. 225
      src/cpp/ext/gcp/observability_logging_sink.cc
  8. 45
      src/cpp/ext/gcp/observability_logging_sink.h

@ -25,6 +25,10 @@ namespace experimental {
// Initialize GCP Observability for gRPC.
absl::Status GcpObservabilityInit();
// Gracefully shuts down GCP Observability.
// Note that graceful shutdown for stats and tracing is not yet supported.
void GcpObservabilityClose();
} // namespace experimental
} // namespace grpc

@ -86,5 +86,6 @@ grpc_cc_library(
"//src/core:poll",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:time",
],
)

@ -54,6 +54,7 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h"
@ -311,6 +312,7 @@ class CallData {
entry->peer = peer_;
entry->service_name = service_name_;
entry->method_name = method_name_;
entry->timestamp = grpc_core::Timestamp::Now();
}
uint64_t call_id_;
uint32_t sequence_id_ = 0;

@ -102,6 +102,7 @@ class LoggingSink {
std::string authority;
std::string service_name;
std::string method_name;
grpc_core::Timestamp timestamp;
};
virtual ~LoggingSink() = default;

@ -101,7 +101,7 @@ grpc_cc_library(
"observability_logging_sink.h",
],
external_deps = [
"absl/base",
"absl/base:core_headers",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
@ -113,11 +113,14 @@ grpc_cc_library(
language = "c++",
visibility = ["//test:__subpackages__"],
deps = [
"environment_autodetect",
"observability_config",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/core:default_event_engine",
"//src/core:env",
"//src/core:json",
"//src/core:time",

@ -51,6 +51,9 @@ namespace grpc {
namespace experimental {
namespace {
grpc::internal::ObservabilityLoggingSink* g_logging_sink = nullptr;
// TODO(yashykt): These constants are currently derived from the example at
// https://cloud.google.com/traffic-director/docs/observability-proxyless#c++.
// We might want these to be configurable.
@ -89,7 +92,6 @@ absl::Status GcpObservabilityInit() {
!config->cloud_logging.has_value()) {
return absl::OkStatus();
}
grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady();
grpc::internal::EnvironmentAutoDetect::Create(config->project_id);
grpc::RegisterOpenCensusPlugin();
if (config->cloud_trace.has_value()) {
@ -136,6 +138,7 @@ absl::Status GcpObservabilityInit() {
// If tracing or monitoring is enabled, we need to get the OpenCensus plugin
// to wait for the environment to be autodetected.
if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady();
grpc::internal::OpenCensusRegistry::Get().RegisterFunctions(
[config_labels = config->labels]() mutable {
grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone(
@ -152,12 +155,18 @@ absl::Status GcpObservabilityInit() {
});
}
if (config->cloud_logging.has_value()) {
grpc::internal::RegisterLoggingFilter(
new grpc::internal::ObservabilityLoggingSink(
config->cloud_logging.value(), config->project_id, config->labels));
g_logging_sink = new grpc::internal::ObservabilityLoggingSink(
config->cloud_logging.value(), config->project_id, config->labels);
grpc::internal::RegisterLoggingFilter(g_logging_sink);
}
return absl::OkStatus();
}
void GcpObservabilityClose() {
if (g_logging_sink != nullptr) {
g_logging_sink->FlushAndClose();
}
}
} // namespace experimental
} // namespace grpc

@ -36,6 +36,7 @@
#include "google/logging/v2/log_entry.pb.h"
#include "google/logging/v2/logging.grpc.pb.h"
#include "google/logging/v2/logging.pb.h"
#include "google/protobuf/text_format.h"
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -44,6 +45,7 @@
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
@ -228,62 +230,211 @@ void EntryToJsonStructProto(LoggingSink::Entry entry,
std::move(entry.method_name));
}
namespace {
uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) {
uint64_t size = sizeof(entry);
for (const auto& pair : entry.payload.metadata) {
size += pair.first.size() + pair.second.size();
}
size += entry.payload.status_message.size();
size += entry.payload.status_details.size();
size += entry.payload.message.size();
size += entry.authority.size();
size += entry.service_name.size();
size += entry.method_name.size();
return size;
}
} // namespace
void ObservabilityLoggingSink::LogEntry(Entry entry) {
absl::call_once(once_, [this]() {
std::string endpoint;
absl::optional<std::string> endpoint_env =
grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
if (endpoint_env.has_value() && !endpoint_env->empty()) {
endpoint = std::move(*endpoint_env);
} else {
endpoint = "logging.googleapis.com";
auto entry_size = EstimateEntrySize(entry);
grpc_core::MutexLock lock(&mu_);
if (sink_closed_) return;
entries_.push_back(std::move(entry));
entries_memory_footprint_ += entry_size;
MaybeTriggerFlushLocked();
}
void ObservabilityLoggingSink::RegisterEnvironmentResource(
const EnvironmentAutoDetect::ResourceType* resource) {
grpc_core::MutexLock lock(&mu_);
resource_ = resource;
MaybeTriggerFlushLocked();
}
void ObservabilityLoggingSink::FlushAndClose() {
grpc_core::MutexLock lock(&mu_);
sink_closed_ = true;
if (entries_.empty()) return;
MaybeTriggerFlushLocked();
sink_flushed_after_close_.Wait(&mu_);
}
void ObservabilityLoggingSink::Flush() {
std::vector<Entry> entries;
google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr;
const EnvironmentAutoDetect::ResourceType* resource = nullptr;
{
grpc_core::MutexLock lock(&mu_);
if (flush_in_progress_) {
return;
}
flush_in_progress_ = true;
flush_timer_in_progress_ = false;
flush_triggered_ = false;
if (stub_ == nullptr) {
std::string endpoint;
absl::optional<std::string> endpoint_env =
grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
if (endpoint_env.has_value() && !endpoint_env->empty()) {
endpoint = std::move(*endpoint_env);
} else {
endpoint = "logging.googleapis.com";
}
ChannelArguments args;
// Disable observability for RPCs on this channel
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
// Set keepalive time to 24 hrs to effectively disable keepalive ping, but
// still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
24 * 60 * 60 * 1000 /* 24 hours */);
args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
stub_ = google::logging::v2::LoggingServiceV2::NewStub(
CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
}
ChannelArguments args;
// Disable observability for RPCs on this channel
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
// Set keepalive time to 24 hrs to effectively disable keepalive ping, but
// still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */);
args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
stub_ = google::logging::v2::LoggingServiceV2::NewStub(
CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
});
stub = stub_.get();
entries = std::move(entries_);
entries_memory_footprint_ = 0;
resource = resource_;
}
FlushEntriesHelper(stub, std::move(entries), resource);
}
void ObservabilityLoggingSink::FlushEntriesHelper(
google::logging::v2::LoggingServiceV2::StubInterface* stub,
std::vector<Entry> entries,
const EnvironmentAutoDetect::ResourceType* resource) {
if (entries.empty()) {
return;
}
struct CallContext {
ClientContext context;
google::logging::v2::WriteLogEntriesRequest request;
google::logging::v2::WriteLogEntriesResponse response;
};
// TODO(yashykt): Implement batching so that we can batch a bunch of log
// entries into a single entry. Also, set a reasonable deadline on the
// context, and actually use the entry.
CallContext* call = new CallContext;
call->context.set_authority(authority_);
call->context.set_deadline(
(grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30))
.as_timespec(GPR_CLOCK_MONOTONIC));
call->request.set_log_name(
absl::StrFormat("projects/%s/logs/"
"microservices.googleapis.com%%2Fobservability%%2fgrpc",
project_id_));
(*call->request.mutable_labels()).insert(labels_.begin(), labels_.end());
// TODO(yashykt): Figure out the proper resource type and labels.
call->request.mutable_resource()->set_type("global");
auto* proto_entry = call->request.add_entries();
// Fill the current timestamp
gpr_timespec timespec =
grpc_core::Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME);
proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec);
proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec);
// TODO(yashykt): Check if we need to fill receive timestamp
EntryToJsonStructProto(std::move(entry), proto_entry->mutable_json_payload());
stub_->async()->WriteLogEntries(
// Set the proper resource type and labels.
call->request.mutable_resource()->set_type(resource->resource_type);
call->request.mutable_resource()->mutable_labels()->insert(
resource->labels.begin(), resource->labels.end());
for (auto& entry : entries) {
auto* proto_entry = call->request.add_entries();
gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME);
proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec);
proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec);
// TODO(yashykt): Check if we need to fill receive timestamp
EntryToJsonStructProto(std::move(entry),
proto_entry->mutable_json_payload());
}
stub->async()->WriteLogEntries(
&(call->context), &(call->request), &(call->response),
[call](Status status) {
[this, call](Status status) {
if (!status.ok()) {
// TODO(yashykt): Log the contents of the
// request on a failure.
gpr_log(GPR_ERROR, "GCP Observability Logging Error %d: %s",
status.error_code(), status.error_message().c_str());
gpr_log(
GPR_ERROR,
"GCP Observability Logging Error %d: %s. Dumping log entries.",
status.error_code(), status.error_message().c_str());
for (auto& entry : call->request.entries()) {
std::string output;
::google::protobuf::TextFormat::PrintToString(entry.json_payload(),
&output);
gpr_log(
GPR_INFO, "Log Entry recorded at time: %s : %s",
grpc_core::Timestamp::FromTimespecRoundUp(
gpr_timespec{entry.timestamp().seconds(),
entry.timestamp().nanos(), GPR_CLOCK_REALTIME})
.ToString()
.c_str(),
output.c_str());
}
}
delete call;
grpc_core::MutexLock lock(&mu_);
flush_in_progress_ = false;
if (sink_closed_ && entries_.empty()) {
sink_flushed_after_close_.SignalAll();
} else {
MaybeTriggerFlushLocked();
}
});
}
void ObservabilityLoggingSink::MaybeTriggerFlush() {
grpc_core::MutexLock lock(&mu_);
return MaybeTriggerFlushLocked();
}
void ObservabilityLoggingSink::MaybeTriggerFlushLocked() {
constexpr int kMaxEntriesBeforeDump = 100000;
constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024;
constexpr int kMinEntriesBeforeFlush = 1000;
constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024;
// Use this opportunity to fetch environment resource if not fetched already
if (resource_ == nullptr && !registered_env_fetch_notification_) {
auto& env_autodetect = EnvironmentAutoDetect::Get();
resource_ = env_autodetect.resource();
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
if (resource_ == nullptr) {
registered_env_fetch_notification_ = true;
env_autodetect.NotifyOnDone([this]() {
RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource());
});
}
}
if (entries_.empty()) return;
if (entries_.size() > kMaxEntriesBeforeDump ||
entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) {
// Buffer limits have been reached. Dump entries with gpr_log
gpr_log(GPR_INFO, "Buffer limit reached. Dumping log entries.");
for (auto& entry : entries_) {
google::protobuf::Struct proto;
std::string timestamp = entry.timestamp.ToString();
EntryToJsonStructProto(std::move(entry), &proto);
std::string output;
::google::protobuf::TextFormat::PrintToString(proto, &output);
gpr_log(GPR_INFO, "Log Entry recorded at time: %s : %s",
timestamp.c_str(), output.c_str());
}
entries_.clear();
entries_memory_footprint_ = 0;
} else if (resource_ != nullptr && !flush_in_progress_) {
// Environment resource has been detected. Trigger flush if conditions
// suffice.
if ((entries_.size() >= kMinEntriesBeforeFlush ||
entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush ||
sink_closed_) &&
!flush_triggered_) {
// It is fine even if there were a flush with a timer in progress. What is
// important is that a flush is triggered.
flush_triggered_ = true;
event_engine_->Run([this]() { Flush(); });
} else if (!flush_timer_in_progress_) {
flush_timer_in_progress_ = true;
event_engine_->RunAfter(grpc_core::Duration::Seconds(1),
[this]() { Flush(); });
}
}
}
ObservabilityLoggingSink::Configuration::Configuration(

@ -31,11 +31,15 @@
#include <google/protobuf/struct.pb.h>
#include "absl/base/call_once.h"
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
#include "google/logging/v2/logging.grpc.pb.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/ext/filters/logging/logging_sink.h"
#include "src/cpp/ext/gcp/environment_autodetect.h"
#include "src/cpp/ext/gcp/observability_config.h"
namespace grpc {
@ -55,6 +59,10 @@ class ObservabilityLoggingSink : public LoggingSink {
void LogEntry(Entry entry) override;
// Triggers a final flush of all the currently buffered logging entries and
// closes the sink preventing any more entries to be logged.
void FlushAndClose();
private:
struct Configuration {
explicit Configuration(
@ -70,13 +78,40 @@ class ObservabilityLoggingSink : public LoggingSink {
uint32_t max_message_bytes = 0;
};
void RegisterEnvironmentResource(
const EnvironmentAutoDetect::ResourceType* resource);
// Flushes the currently stored entries. \a timed_flush denotes whether this
// Flush was triggered from a timer.
void Flush();
void FlushEntriesHelper(
google::logging::v2::LoggingServiceV2::StubInterface* stub,
std::vector<Entry> entries,
const EnvironmentAutoDetect::ResourceType* resource);
void MaybeTriggerFlush();
void MaybeTriggerFlushLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::vector<Configuration> client_configs_;
std::vector<Configuration> server_configs_;
std::string project_id_;
const std::string project_id_;
std::string authority_;
std::vector<std::pair<std::string, std::string>> labels_;
absl::once_flag once_;
std::unique_ptr<google::logging::v2::LoggingServiceV2::StubInterface> stub_;
const std::vector<std::pair<std::string, std::string>> labels_;
grpc_core::Mutex mu_;
bool registered_env_fetch_notification_ = false;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> ABSL_GUARDED_BY(
mu_) event_engine_;
std::unique_ptr<google::logging::v2::LoggingServiceV2::StubInterface> stub_
ABSL_GUARDED_BY(mu_);
std::vector<Entry> entries_ ABSL_GUARDED_BY(mu_);
uint64_t entries_memory_footprint_ ABSL_GUARDED_BY(mu_) = 0;
const EnvironmentAutoDetect::ResourceType* resource_ ABSL_GUARDED_BY(mu_) =
nullptr;
bool flush_triggered_ ABSL_GUARDED_BY(mu_) = false;
bool flush_in_progress_ ABSL_GUARDED_BY(mu_) = false;
bool flush_timer_in_progress_ ABSL_GUARDED_BY(mu_) = false;
bool sink_closed_ ABSL_GUARDED_BY(mu_) = false;
grpc_core::CondVar sink_flushed_after_close_;
};
// Exposed for just for testing purposes

Loading…
Cancel
Save