diff --git a/include/grpcpp/ext/gcp_observability.h b/include/grpcpp/ext/gcp_observability.h index 942a56392b5..16d51512b0d 100644 --- a/include/grpcpp/ext/gcp_observability.h +++ b/include/grpcpp/ext/gcp_observability.h @@ -25,6 +25,10 @@ namespace experimental { // Initialize GCP Observability for gRPC. absl::Status GcpObservabilityInit(); +// Gracefully shuts down GCP Observability. +// Note that graceful shutdown for stats and tracing is not yet supported. +void GcpObservabilityClose(); + } // namespace experimental } // namespace grpc diff --git a/src/cpp/ext/filters/logging/BUILD b/src/cpp/ext/filters/logging/BUILD index 8d7b3440224..1cb871aecb8 100644 --- a/src/cpp/ext/filters/logging/BUILD +++ b/src/cpp/ext/filters/logging/BUILD @@ -86,5 +86,6 @@ grpc_cc_library( "//src/core:poll", "//src/core:slice", "//src/core:slice_buffer", + "//src/core:time", ], ) diff --git a/src/cpp/ext/filters/logging/logging_filter.cc b/src/cpp/ext/filters/logging/logging_filter.cc index 79543ed1bc4..30dfed36f0d 100644 --- a/src/cpp/ext/filters/logging/logging_filter.cc +++ b/src/cpp/ext/filters/logging/logging_filter.cc @@ -54,6 +54,7 @@ #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/gprpp/time.h" #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/cancel_callback.h" #include "src/core/lib/promise/context.h" @@ -311,6 +312,7 @@ class CallData { entry->peer = peer_; entry->service_name = service_name_; entry->method_name = method_name_; + entry->timestamp = grpc_core::Timestamp::Now(); } uint64_t call_id_; uint32_t sequence_id_ = 0; diff --git a/src/cpp/ext/filters/logging/logging_sink.h b/src/cpp/ext/filters/logging/logging_sink.h index b7f3292990a..471ac171b39 100644 --- a/src/cpp/ext/filters/logging/logging_sink.h +++ b/src/cpp/ext/filters/logging/logging_sink.h @@ -102,6 +102,7 @@ class LoggingSink { std::string authority; std::string service_name; std::string method_name; + grpc_core::Timestamp timestamp; }; virtual ~LoggingSink() = default; diff --git a/src/cpp/ext/gcp/BUILD b/src/cpp/ext/gcp/BUILD index 8957c7a5829..1b691a9f8a0 100644 --- a/src/cpp/ext/gcp/BUILD +++ b/src/cpp/ext/gcp/BUILD @@ -101,7 +101,7 @@ grpc_cc_library( "observability_logging_sink.h", ], external_deps = [ - "absl/base", + "absl/base:core_headers", "absl/strings", "absl/strings:str_format", "absl/types:optional", @@ -113,11 +113,14 @@ grpc_cc_library( language = "c++", visibility = ["//test:__subpackages__"], deps = [ + "environment_autodetect", "observability_config", + "//:event_engine_base_hdrs", "//:gpr", "//:gpr_platform", "//:grpc++", "//:grpc_opencensus_plugin", + "//src/core:default_event_engine", "//src/core:env", "//src/core:json", "//src/core:time", diff --git a/src/cpp/ext/gcp/observability.cc b/src/cpp/ext/gcp/observability.cc index 7644117623a..dc456646bb2 100644 --- a/src/cpp/ext/gcp/observability.cc +++ b/src/cpp/ext/gcp/observability.cc @@ -51,6 +51,9 @@ namespace grpc { namespace experimental { namespace { + +grpc::internal::ObservabilityLoggingSink* g_logging_sink = nullptr; + // TODO(yashykt): These constants are currently derived from the example at // https://cloud.google.com/traffic-director/docs/observability-proxyless#c++. // We might want these to be configurable. @@ -89,7 +92,6 @@ absl::Status GcpObservabilityInit() { !config->cloud_logging.has_value()) { return absl::OkStatus(); } - grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady(); grpc::internal::EnvironmentAutoDetect::Create(config->project_id); grpc::RegisterOpenCensusPlugin(); if (config->cloud_trace.has_value()) { @@ -136,6 +138,7 @@ absl::Status GcpObservabilityInit() { // If tracing or monitoring is enabled, we need to get the OpenCensus plugin // to wait for the environment to be autodetected. if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) { + grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady(); grpc::internal::OpenCensusRegistry::Get().RegisterFunctions( [config_labels = config->labels]() mutable { grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone( @@ -152,12 +155,18 @@ absl::Status GcpObservabilityInit() { }); } if (config->cloud_logging.has_value()) { - grpc::internal::RegisterLoggingFilter( - new grpc::internal::ObservabilityLoggingSink( - config->cloud_logging.value(), config->project_id, config->labels)); + g_logging_sink = new grpc::internal::ObservabilityLoggingSink( + config->cloud_logging.value(), config->project_id, config->labels); + grpc::internal::RegisterLoggingFilter(g_logging_sink); } return absl::OkStatus(); } +void GcpObservabilityClose() { + if (g_logging_sink != nullptr) { + g_logging_sink->FlushAndClose(); + } +} + } // namespace experimental } // namespace grpc diff --git a/src/cpp/ext/gcp/observability_logging_sink.cc b/src/cpp/ext/gcp/observability_logging_sink.cc index d48fa288663..f320e9e49dc 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.cc +++ b/src/cpp/ext/gcp/observability_logging_sink.cc @@ -36,6 +36,7 @@ #include "google/logging/v2/log_entry.pb.h" #include "google/logging/v2/logging.grpc.pb.h" #include "google/logging/v2/logging.pb.h" +#include "google/protobuf/text_format.h" #include #include @@ -44,6 +45,7 @@ #include #include +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/json/json.h" @@ -228,62 +230,211 @@ void EntryToJsonStructProto(LoggingSink::Entry entry, std::move(entry.method_name)); } +namespace { + +uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) { + uint64_t size = sizeof(entry); + for (const auto& pair : entry.payload.metadata) { + size += pair.first.size() + pair.second.size(); + } + size += entry.payload.status_message.size(); + size += entry.payload.status_details.size(); + size += entry.payload.message.size(); + size += entry.authority.size(); + size += entry.service_name.size(); + size += entry.method_name.size(); + return size; +} + +} // namespace + void ObservabilityLoggingSink::LogEntry(Entry entry) { - absl::call_once(once_, [this]() { - std::string endpoint; - absl::optional endpoint_env = - grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); - if (endpoint_env.has_value() && !endpoint_env->empty()) { - endpoint = std::move(*endpoint_env); - } else { - endpoint = "logging.googleapis.com"; + auto entry_size = EstimateEntrySize(entry); + grpc_core::MutexLock lock(&mu_); + if (sink_closed_) return; + entries_.push_back(std::move(entry)); + entries_memory_footprint_ += entry_size; + MaybeTriggerFlushLocked(); +} + +void ObservabilityLoggingSink::RegisterEnvironmentResource( + const EnvironmentAutoDetect::ResourceType* resource) { + grpc_core::MutexLock lock(&mu_); + resource_ = resource; + MaybeTriggerFlushLocked(); +} + +void ObservabilityLoggingSink::FlushAndClose() { + grpc_core::MutexLock lock(&mu_); + sink_closed_ = true; + if (entries_.empty()) return; + MaybeTriggerFlushLocked(); + sink_flushed_after_close_.Wait(&mu_); +} + +void ObservabilityLoggingSink::Flush() { + std::vector entries; + google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr; + const EnvironmentAutoDetect::ResourceType* resource = nullptr; + { + grpc_core::MutexLock lock(&mu_); + if (flush_in_progress_) { + return; + } + flush_in_progress_ = true; + flush_timer_in_progress_ = false; + flush_triggered_ = false; + if (stub_ == nullptr) { + std::string endpoint; + absl::optional endpoint_env = + grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); + if (endpoint_env.has_value() && !endpoint_env->empty()) { + endpoint = std::move(*endpoint_env); + } else { + endpoint = "logging.googleapis.com"; + } + ChannelArguments args; + // Disable observability for RPCs on this channel + args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); + // Set keepalive time to 24 hrs to effectively disable keepalive ping, but + // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect. + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, + 24 * 60 * 60 * 1000 /* 24 hours */); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */); + stub_ = google::logging::v2::LoggingServiceV2::NewStub( + CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args)); } - ChannelArguments args; - // Disable observability for RPCs on this channel - args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); - // Set keepalive time to 24 hrs to effectively disable keepalive ping, but - // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect. - args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */); - args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */); - stub_ = google::logging::v2::LoggingServiceV2::NewStub( - CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args)); - }); + stub = stub_.get(); + entries = std::move(entries_); + entries_memory_footprint_ = 0; + resource = resource_; + } + FlushEntriesHelper(stub, std::move(entries), resource); +} + +void ObservabilityLoggingSink::FlushEntriesHelper( + google::logging::v2::LoggingServiceV2::StubInterface* stub, + std::vector entries, + const EnvironmentAutoDetect::ResourceType* resource) { + if (entries.empty()) { + return; + } struct CallContext { ClientContext context; google::logging::v2::WriteLogEntriesRequest request; google::logging::v2::WriteLogEntriesResponse response; }; - // TODO(yashykt): Implement batching so that we can batch a bunch of log - // entries into a single entry. Also, set a reasonable deadline on the - // context, and actually use the entry. CallContext* call = new CallContext; call->context.set_authority(authority_); + call->context.set_deadline( + (grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30)) + .as_timespec(GPR_CLOCK_MONOTONIC)); call->request.set_log_name( absl::StrFormat("projects/%s/logs/" "microservices.googleapis.com%%2Fobservability%%2fgrpc", project_id_)); (*call->request.mutable_labels()).insert(labels_.begin(), labels_.end()); - // TODO(yashykt): Figure out the proper resource type and labels. - call->request.mutable_resource()->set_type("global"); - auto* proto_entry = call->request.add_entries(); - // Fill the current timestamp - gpr_timespec timespec = - grpc_core::Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME); - proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec); - proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec); - // TODO(yashykt): Check if we need to fill receive timestamp - EntryToJsonStructProto(std::move(entry), proto_entry->mutable_json_payload()); - stub_->async()->WriteLogEntries( + // Set the proper resource type and labels. + call->request.mutable_resource()->set_type(resource->resource_type); + call->request.mutable_resource()->mutable_labels()->insert( + resource->labels.begin(), resource->labels.end()); + for (auto& entry : entries) { + auto* proto_entry = call->request.add_entries(); + gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME); + proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec); + proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec); + // TODO(yashykt): Check if we need to fill receive timestamp + EntryToJsonStructProto(std::move(entry), + proto_entry->mutable_json_payload()); + } + stub->async()->WriteLogEntries( &(call->context), &(call->request), &(call->response), - [call](Status status) { + [this, call](Status status) { if (!status.ok()) { - // TODO(yashykt): Log the contents of the - // request on a failure. - gpr_log(GPR_ERROR, "GCP Observability Logging Error %d: %s", - status.error_code(), status.error_message().c_str()); + gpr_log( + GPR_ERROR, + "GCP Observability Logging Error %d: %s. Dumping log entries.", + status.error_code(), status.error_message().c_str()); + for (auto& entry : call->request.entries()) { + std::string output; + ::google::protobuf::TextFormat::PrintToString(entry.json_payload(), + &output); + gpr_log( + GPR_INFO, "Log Entry recorded at time: %s : %s", + grpc_core::Timestamp::FromTimespecRoundUp( + gpr_timespec{entry.timestamp().seconds(), + entry.timestamp().nanos(), GPR_CLOCK_REALTIME}) + .ToString() + .c_str(), + output.c_str()); + } } delete call; + grpc_core::MutexLock lock(&mu_); + flush_in_progress_ = false; + if (sink_closed_ && entries_.empty()) { + sink_flushed_after_close_.SignalAll(); + } else { + MaybeTriggerFlushLocked(); + } + }); +} + +void ObservabilityLoggingSink::MaybeTriggerFlush() { + grpc_core::MutexLock lock(&mu_); + return MaybeTriggerFlushLocked(); +} + +void ObservabilityLoggingSink::MaybeTriggerFlushLocked() { + constexpr int kMaxEntriesBeforeDump = 100000; + constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024; + constexpr int kMinEntriesBeforeFlush = 1000; + constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024; + // Use this opportunity to fetch environment resource if not fetched already + if (resource_ == nullptr && !registered_env_fetch_notification_) { + auto& env_autodetect = EnvironmentAutoDetect::Get(); + resource_ = env_autodetect.resource(); + event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); + if (resource_ == nullptr) { + registered_env_fetch_notification_ = true; + env_autodetect.NotifyOnDone([this]() { + RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource()); }); + } + } + if (entries_.empty()) return; + if (entries_.size() > kMaxEntriesBeforeDump || + entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) { + // Buffer limits have been reached. Dump entries with gpr_log + gpr_log(GPR_INFO, "Buffer limit reached. Dumping log entries."); + for (auto& entry : entries_) { + google::protobuf::Struct proto; + std::string timestamp = entry.timestamp.ToString(); + EntryToJsonStructProto(std::move(entry), &proto); + std::string output; + ::google::protobuf::TextFormat::PrintToString(proto, &output); + gpr_log(GPR_INFO, "Log Entry recorded at time: %s : %s", + timestamp.c_str(), output.c_str()); + } + entries_.clear(); + entries_memory_footprint_ = 0; + } else if (resource_ != nullptr && !flush_in_progress_) { + // Environment resource has been detected. Trigger flush if conditions + // suffice. + if ((entries_.size() >= kMinEntriesBeforeFlush || + entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush || + sink_closed_) && + !flush_triggered_) { + // It is fine even if there were a flush with a timer in progress. What is + // important is that a flush is triggered. + flush_triggered_ = true; + event_engine_->Run([this]() { Flush(); }); + } else if (!flush_timer_in_progress_) { + flush_timer_in_progress_ = true; + event_engine_->RunAfter(grpc_core::Duration::Seconds(1), + [this]() { Flush(); }); + } + } } ObservabilityLoggingSink::Configuration::Configuration( diff --git a/src/cpp/ext/gcp/observability_logging_sink.h b/src/cpp/ext/gcp/observability_logging_sink.h index 8b29b20a09d..694757304f4 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.h +++ b/src/cpp/ext/gcp/observability_logging_sink.h @@ -31,11 +31,15 @@ #include -#include "absl/base/call_once.h" +#include "absl/base/thread_annotations.h" #include "absl/strings/string_view.h" #include "google/logging/v2/logging.grpc.pb.h" +#include + +#include "src/core/lib/gprpp/sync.h" #include "src/cpp/ext/filters/logging/logging_sink.h" +#include "src/cpp/ext/gcp/environment_autodetect.h" #include "src/cpp/ext/gcp/observability_config.h" namespace grpc { @@ -55,6 +59,10 @@ class ObservabilityLoggingSink : public LoggingSink { void LogEntry(Entry entry) override; + // Triggers a final flush of all the currently buffered logging entries and + // closes the sink preventing any more entries to be logged. + void FlushAndClose(); + private: struct Configuration { explicit Configuration( @@ -70,13 +78,40 @@ class ObservabilityLoggingSink : public LoggingSink { uint32_t max_message_bytes = 0; }; + void RegisterEnvironmentResource( + const EnvironmentAutoDetect::ResourceType* resource); + + // Flushes the currently stored entries. \a timed_flush denotes whether this + // Flush was triggered from a timer. + void Flush(); + void FlushEntriesHelper( + google::logging::v2::LoggingServiceV2::StubInterface* stub, + std::vector entries, + const EnvironmentAutoDetect::ResourceType* resource); + + void MaybeTriggerFlush(); + void MaybeTriggerFlushLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + std::vector client_configs_; std::vector server_configs_; - std::string project_id_; + const std::string project_id_; std::string authority_; - std::vector> labels_; - absl::once_flag once_; - std::unique_ptr stub_; + const std::vector> labels_; + grpc_core::Mutex mu_; + bool registered_env_fetch_notification_ = false; + std::shared_ptr ABSL_GUARDED_BY( + mu_) event_engine_; + std::unique_ptr stub_ + ABSL_GUARDED_BY(mu_); + std::vector entries_ ABSL_GUARDED_BY(mu_); + uint64_t entries_memory_footprint_ ABSL_GUARDED_BY(mu_) = 0; + const EnvironmentAutoDetect::ResourceType* resource_ ABSL_GUARDED_BY(mu_) = + nullptr; + bool flush_triggered_ ABSL_GUARDED_BY(mu_) = false; + bool flush_in_progress_ ABSL_GUARDED_BY(mu_) = false; + bool flush_timer_in_progress_ ABSL_GUARDED_BY(mu_) = false; + bool sink_closed_ ABSL_GUARDED_BY(mu_) = false; + grpc_core::CondVar sink_flushed_after_close_; }; // Exposed for just for testing purposes