From bf23bb2fa75e9c3cf43e262e07ce7b7e0e3eae6c Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 22 Feb 2023 22:32:44 -0800 Subject: [PATCH] Gcp Observability Logging: Batching and Graceful close (#32436) This PR adds batching support for GCP Observability logging. So instead of the naive creating a new RPC to cloud logging for each logging event, we now batch the log events to meet one of the following requirements - * Batch size of 1000 * Batch memory consumption of 1MB * A timeout period of 1sec after which we flush the accumulated batch irrespective of the size. There can also be cases where for some reason the RPCs fail or the batch just accumulates to a very large size(100000 entries or 10MB in size). In such cases, we just log the events with gpr_log instead of just continuing to accumulate. Additionally, `GcpObservabilityClose()` has been added to gracefully shut off logging where we block till all the currently logged events are flushed. (We might be able to gracefully shut off stats and tracing in the future too.) --- include/grpcpp/ext/gcp_observability.h | 4 + src/cpp/ext/filters/logging/BUILD | 1 + src/cpp/ext/filters/logging/logging_filter.cc | 2 + src/cpp/ext/filters/logging/logging_sink.h | 1 + src/cpp/ext/gcp/BUILD | 5 +- src/cpp/ext/gcp/observability.cc | 17 +- src/cpp/ext/gcp/observability_logging_sink.cc | 225 +++++++++++++++--- src/cpp/ext/gcp/observability_logging_sink.h | 45 +++- 8 files changed, 253 insertions(+), 47 deletions(-) diff --git a/include/grpcpp/ext/gcp_observability.h b/include/grpcpp/ext/gcp_observability.h index 942a56392b5..16d51512b0d 100644 --- a/include/grpcpp/ext/gcp_observability.h +++ b/include/grpcpp/ext/gcp_observability.h @@ -25,6 +25,10 @@ namespace experimental { // Initialize GCP Observability for gRPC. absl::Status GcpObservabilityInit(); +// Gracefully shuts down GCP Observability. +// Note that graceful shutdown for stats and tracing is not yet supported. +void GcpObservabilityClose(); + } // namespace experimental } // namespace grpc diff --git a/src/cpp/ext/filters/logging/BUILD b/src/cpp/ext/filters/logging/BUILD index 8d7b3440224..1cb871aecb8 100644 --- a/src/cpp/ext/filters/logging/BUILD +++ b/src/cpp/ext/filters/logging/BUILD @@ -86,5 +86,6 @@ grpc_cc_library( "//src/core:poll", "//src/core:slice", "//src/core:slice_buffer", + "//src/core:time", ], ) diff --git a/src/cpp/ext/filters/logging/logging_filter.cc b/src/cpp/ext/filters/logging/logging_filter.cc index 79543ed1bc4..30dfed36f0d 100644 --- a/src/cpp/ext/filters/logging/logging_filter.cc +++ b/src/cpp/ext/filters/logging/logging_filter.cc @@ -54,6 +54,7 @@ #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/gprpp/time.h" #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/cancel_callback.h" #include "src/core/lib/promise/context.h" @@ -311,6 +312,7 @@ class CallData { entry->peer = peer_; entry->service_name = service_name_; entry->method_name = method_name_; + entry->timestamp = grpc_core::Timestamp::Now(); } uint64_t call_id_; uint32_t sequence_id_ = 0; diff --git a/src/cpp/ext/filters/logging/logging_sink.h b/src/cpp/ext/filters/logging/logging_sink.h index b7f3292990a..471ac171b39 100644 --- a/src/cpp/ext/filters/logging/logging_sink.h +++ b/src/cpp/ext/filters/logging/logging_sink.h @@ -102,6 +102,7 @@ class LoggingSink { std::string authority; std::string service_name; std::string method_name; + grpc_core::Timestamp timestamp; }; virtual ~LoggingSink() = default; diff --git a/src/cpp/ext/gcp/BUILD b/src/cpp/ext/gcp/BUILD index 8957c7a5829..1b691a9f8a0 100644 --- a/src/cpp/ext/gcp/BUILD +++ b/src/cpp/ext/gcp/BUILD @@ -101,7 +101,7 @@ grpc_cc_library( "observability_logging_sink.h", ], external_deps = [ - "absl/base", + "absl/base:core_headers", "absl/strings", "absl/strings:str_format", "absl/types:optional", @@ -113,11 +113,14 @@ grpc_cc_library( language = "c++", visibility = ["//test:__subpackages__"], deps = [ + "environment_autodetect", "observability_config", + "//:event_engine_base_hdrs", "//:gpr", "//:gpr_platform", "//:grpc++", "//:grpc_opencensus_plugin", + "//src/core:default_event_engine", "//src/core:env", "//src/core:json", "//src/core:time", diff --git a/src/cpp/ext/gcp/observability.cc b/src/cpp/ext/gcp/observability.cc index 7644117623a..dc456646bb2 100644 --- a/src/cpp/ext/gcp/observability.cc +++ b/src/cpp/ext/gcp/observability.cc @@ -51,6 +51,9 @@ namespace grpc { namespace experimental { namespace { + +grpc::internal::ObservabilityLoggingSink* g_logging_sink = nullptr; + // TODO(yashykt): These constants are currently derived from the example at // https://cloud.google.com/traffic-director/docs/observability-proxyless#c++. // We might want these to be configurable. @@ -89,7 +92,6 @@ absl::Status GcpObservabilityInit() { !config->cloud_logging.has_value()) { return absl::OkStatus(); } - grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady(); grpc::internal::EnvironmentAutoDetect::Create(config->project_id); grpc::RegisterOpenCensusPlugin(); if (config->cloud_trace.has_value()) { @@ -136,6 +138,7 @@ absl::Status GcpObservabilityInit() { // If tracing or monitoring is enabled, we need to get the OpenCensus plugin // to wait for the environment to be autodetected. if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) { + grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady(); grpc::internal::OpenCensusRegistry::Get().RegisterFunctions( [config_labels = config->labels]() mutable { grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone( @@ -152,12 +155,18 @@ absl::Status GcpObservabilityInit() { }); } if (config->cloud_logging.has_value()) { - grpc::internal::RegisterLoggingFilter( - new grpc::internal::ObservabilityLoggingSink( - config->cloud_logging.value(), config->project_id, config->labels)); + g_logging_sink = new grpc::internal::ObservabilityLoggingSink( + config->cloud_logging.value(), config->project_id, config->labels); + grpc::internal::RegisterLoggingFilter(g_logging_sink); } return absl::OkStatus(); } +void GcpObservabilityClose() { + if (g_logging_sink != nullptr) { + g_logging_sink->FlushAndClose(); + } +} + } // namespace experimental } // namespace grpc diff --git a/src/cpp/ext/gcp/observability_logging_sink.cc b/src/cpp/ext/gcp/observability_logging_sink.cc index d48fa288663..f320e9e49dc 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.cc +++ b/src/cpp/ext/gcp/observability_logging_sink.cc @@ -36,6 +36,7 @@ #include "google/logging/v2/log_entry.pb.h" #include "google/logging/v2/logging.grpc.pb.h" #include "google/logging/v2/logging.pb.h" +#include "google/protobuf/text_format.h" #include #include @@ -44,6 +45,7 @@ #include #include +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/json/json.h" @@ -228,62 +230,211 @@ void EntryToJsonStructProto(LoggingSink::Entry entry, std::move(entry.method_name)); } +namespace { + +uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) { + uint64_t size = sizeof(entry); + for (const auto& pair : entry.payload.metadata) { + size += pair.first.size() + pair.second.size(); + } + size += entry.payload.status_message.size(); + size += entry.payload.status_details.size(); + size += entry.payload.message.size(); + size += entry.authority.size(); + size += entry.service_name.size(); + size += entry.method_name.size(); + return size; +} + +} // namespace + void ObservabilityLoggingSink::LogEntry(Entry entry) { - absl::call_once(once_, [this]() { - std::string endpoint; - absl::optional endpoint_env = - grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); - if (endpoint_env.has_value() && !endpoint_env->empty()) { - endpoint = std::move(*endpoint_env); - } else { - endpoint = "logging.googleapis.com"; + auto entry_size = EstimateEntrySize(entry); + grpc_core::MutexLock lock(&mu_); + if (sink_closed_) return; + entries_.push_back(std::move(entry)); + entries_memory_footprint_ += entry_size; + MaybeTriggerFlushLocked(); +} + +void ObservabilityLoggingSink::RegisterEnvironmentResource( + const EnvironmentAutoDetect::ResourceType* resource) { + grpc_core::MutexLock lock(&mu_); + resource_ = resource; + MaybeTriggerFlushLocked(); +} + +void ObservabilityLoggingSink::FlushAndClose() { + grpc_core::MutexLock lock(&mu_); + sink_closed_ = true; + if (entries_.empty()) return; + MaybeTriggerFlushLocked(); + sink_flushed_after_close_.Wait(&mu_); +} + +void ObservabilityLoggingSink::Flush() { + std::vector entries; + google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr; + const EnvironmentAutoDetect::ResourceType* resource = nullptr; + { + grpc_core::MutexLock lock(&mu_); + if (flush_in_progress_) { + return; + } + flush_in_progress_ = true; + flush_timer_in_progress_ = false; + flush_triggered_ = false; + if (stub_ == nullptr) { + std::string endpoint; + absl::optional endpoint_env = + grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); + if (endpoint_env.has_value() && !endpoint_env->empty()) { + endpoint = std::move(*endpoint_env); + } else { + endpoint = "logging.googleapis.com"; + } + ChannelArguments args; + // Disable observability for RPCs on this channel + args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); + // Set keepalive time to 24 hrs to effectively disable keepalive ping, but + // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect. + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, + 24 * 60 * 60 * 1000 /* 24 hours */); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */); + stub_ = google::logging::v2::LoggingServiceV2::NewStub( + CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args)); } - ChannelArguments args; - // Disable observability for RPCs on this channel - args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); - // Set keepalive time to 24 hrs to effectively disable keepalive ping, but - // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect. - args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */); - args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */); - stub_ = google::logging::v2::LoggingServiceV2::NewStub( - CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args)); - }); + stub = stub_.get(); + entries = std::move(entries_); + entries_memory_footprint_ = 0; + resource = resource_; + } + FlushEntriesHelper(stub, std::move(entries), resource); +} + +void ObservabilityLoggingSink::FlushEntriesHelper( + google::logging::v2::LoggingServiceV2::StubInterface* stub, + std::vector entries, + const EnvironmentAutoDetect::ResourceType* resource) { + if (entries.empty()) { + return; + } struct CallContext { ClientContext context; google::logging::v2::WriteLogEntriesRequest request; google::logging::v2::WriteLogEntriesResponse response; }; - // TODO(yashykt): Implement batching so that we can batch a bunch of log - // entries into a single entry. Also, set a reasonable deadline on the - // context, and actually use the entry. CallContext* call = new CallContext; call->context.set_authority(authority_); + call->context.set_deadline( + (grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30)) + .as_timespec(GPR_CLOCK_MONOTONIC)); call->request.set_log_name( absl::StrFormat("projects/%s/logs/" "microservices.googleapis.com%%2Fobservability%%2fgrpc", project_id_)); (*call->request.mutable_labels()).insert(labels_.begin(), labels_.end()); - // TODO(yashykt): Figure out the proper resource type and labels. - call->request.mutable_resource()->set_type("global"); - auto* proto_entry = call->request.add_entries(); - // Fill the current timestamp - gpr_timespec timespec = - grpc_core::Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME); - proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec); - proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec); - // TODO(yashykt): Check if we need to fill receive timestamp - EntryToJsonStructProto(std::move(entry), proto_entry->mutable_json_payload()); - stub_->async()->WriteLogEntries( + // Set the proper resource type and labels. + call->request.mutable_resource()->set_type(resource->resource_type); + call->request.mutable_resource()->mutable_labels()->insert( + resource->labels.begin(), resource->labels.end()); + for (auto& entry : entries) { + auto* proto_entry = call->request.add_entries(); + gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME); + proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec); + proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec); + // TODO(yashykt): Check if we need to fill receive timestamp + EntryToJsonStructProto(std::move(entry), + proto_entry->mutable_json_payload()); + } + stub->async()->WriteLogEntries( &(call->context), &(call->request), &(call->response), - [call](Status status) { + [this, call](Status status) { if (!status.ok()) { - // TODO(yashykt): Log the contents of the - // request on a failure. - gpr_log(GPR_ERROR, "GCP Observability Logging Error %d: %s", - status.error_code(), status.error_message().c_str()); + gpr_log( + GPR_ERROR, + "GCP Observability Logging Error %d: %s. Dumping log entries.", + status.error_code(), status.error_message().c_str()); + for (auto& entry : call->request.entries()) { + std::string output; + ::google::protobuf::TextFormat::PrintToString(entry.json_payload(), + &output); + gpr_log( + GPR_INFO, "Log Entry recorded at time: %s : %s", + grpc_core::Timestamp::FromTimespecRoundUp( + gpr_timespec{entry.timestamp().seconds(), + entry.timestamp().nanos(), GPR_CLOCK_REALTIME}) + .ToString() + .c_str(), + output.c_str()); + } } delete call; + grpc_core::MutexLock lock(&mu_); + flush_in_progress_ = false; + if (sink_closed_ && entries_.empty()) { + sink_flushed_after_close_.SignalAll(); + } else { + MaybeTriggerFlushLocked(); + } + }); +} + +void ObservabilityLoggingSink::MaybeTriggerFlush() { + grpc_core::MutexLock lock(&mu_); + return MaybeTriggerFlushLocked(); +} + +void ObservabilityLoggingSink::MaybeTriggerFlushLocked() { + constexpr int kMaxEntriesBeforeDump = 100000; + constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024; + constexpr int kMinEntriesBeforeFlush = 1000; + constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024; + // Use this opportunity to fetch environment resource if not fetched already + if (resource_ == nullptr && !registered_env_fetch_notification_) { + auto& env_autodetect = EnvironmentAutoDetect::Get(); + resource_ = env_autodetect.resource(); + event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); + if (resource_ == nullptr) { + registered_env_fetch_notification_ = true; + env_autodetect.NotifyOnDone([this]() { + RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource()); }); + } + } + if (entries_.empty()) return; + if (entries_.size() > kMaxEntriesBeforeDump || + entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) { + // Buffer limits have been reached. Dump entries with gpr_log + gpr_log(GPR_INFO, "Buffer limit reached. Dumping log entries."); + for (auto& entry : entries_) { + google::protobuf::Struct proto; + std::string timestamp = entry.timestamp.ToString(); + EntryToJsonStructProto(std::move(entry), &proto); + std::string output; + ::google::protobuf::TextFormat::PrintToString(proto, &output); + gpr_log(GPR_INFO, "Log Entry recorded at time: %s : %s", + timestamp.c_str(), output.c_str()); + } + entries_.clear(); + entries_memory_footprint_ = 0; + } else if (resource_ != nullptr && !flush_in_progress_) { + // Environment resource has been detected. Trigger flush if conditions + // suffice. + if ((entries_.size() >= kMinEntriesBeforeFlush || + entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush || + sink_closed_) && + !flush_triggered_) { + // It is fine even if there were a flush with a timer in progress. What is + // important is that a flush is triggered. + flush_triggered_ = true; + event_engine_->Run([this]() { Flush(); }); + } else if (!flush_timer_in_progress_) { + flush_timer_in_progress_ = true; + event_engine_->RunAfter(grpc_core::Duration::Seconds(1), + [this]() { Flush(); }); + } + } } ObservabilityLoggingSink::Configuration::Configuration( diff --git a/src/cpp/ext/gcp/observability_logging_sink.h b/src/cpp/ext/gcp/observability_logging_sink.h index 8b29b20a09d..694757304f4 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.h +++ b/src/cpp/ext/gcp/observability_logging_sink.h @@ -31,11 +31,15 @@ #include -#include "absl/base/call_once.h" +#include "absl/base/thread_annotations.h" #include "absl/strings/string_view.h" #include "google/logging/v2/logging.grpc.pb.h" +#include + +#include "src/core/lib/gprpp/sync.h" #include "src/cpp/ext/filters/logging/logging_sink.h" +#include "src/cpp/ext/gcp/environment_autodetect.h" #include "src/cpp/ext/gcp/observability_config.h" namespace grpc { @@ -55,6 +59,10 @@ class ObservabilityLoggingSink : public LoggingSink { void LogEntry(Entry entry) override; + // Triggers a final flush of all the currently buffered logging entries and + // closes the sink preventing any more entries to be logged. + void FlushAndClose(); + private: struct Configuration { explicit Configuration( @@ -70,13 +78,40 @@ class ObservabilityLoggingSink : public LoggingSink { uint32_t max_message_bytes = 0; }; + void RegisterEnvironmentResource( + const EnvironmentAutoDetect::ResourceType* resource); + + // Flushes the currently stored entries. \a timed_flush denotes whether this + // Flush was triggered from a timer. + void Flush(); + void FlushEntriesHelper( + google::logging::v2::LoggingServiceV2::StubInterface* stub, + std::vector entries, + const EnvironmentAutoDetect::ResourceType* resource); + + void MaybeTriggerFlush(); + void MaybeTriggerFlushLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + std::vector client_configs_; std::vector server_configs_; - std::string project_id_; + const std::string project_id_; std::string authority_; - std::vector> labels_; - absl::once_flag once_; - std::unique_ptr stub_; + const std::vector> labels_; + grpc_core::Mutex mu_; + bool registered_env_fetch_notification_ = false; + std::shared_ptr ABSL_GUARDED_BY( + mu_) event_engine_; + std::unique_ptr stub_ + ABSL_GUARDED_BY(mu_); + std::vector entries_ ABSL_GUARDED_BY(mu_); + uint64_t entries_memory_footprint_ ABSL_GUARDED_BY(mu_) = 0; + const EnvironmentAutoDetect::ResourceType* resource_ ABSL_GUARDED_BY(mu_) = + nullptr; + bool flush_triggered_ ABSL_GUARDED_BY(mu_) = false; + bool flush_in_progress_ ABSL_GUARDED_BY(mu_) = false; + bool flush_timer_in_progress_ ABSL_GUARDED_BY(mu_) = false; + bool sink_closed_ ABSL_GUARDED_BY(mu_) = false; + grpc_core::CondVar sink_flushed_after_close_; }; // Exposed for just for testing purposes