From 2146107e25de48b5090b210441ddae7bd0edd8a5 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 15 Nov 2022 18:02:22 -0800 Subject: [PATCH] Observability Logging: Interfaces and structure (#31651) * Observability Logging: Interfaces and structure * Revert unrelated changes * Fix test build * Add dependency on Google Apis Logging V2 service and add a dummy logging call * Add authority --- bazel/grpc_deps.bzl | 5 ++ src/cpp/ext/filters/logging/BUILD | 1 + src/cpp/ext/filters/logging/logging_sink.h | 49 +++++++++++++++ src/cpp/ext/gcp/BUILD | 5 ++ src/cpp/ext/gcp/observability_logging_sink.cc | 61 +++++++++++++++++-- src/cpp/ext/gcp/observability_logging_sink.h | 18 ++++-- .../gcp/observability_logging_sink_test.cc | 20 +++--- 7 files changed, 141 insertions(+), 18 deletions(-) diff --git a/bazel/grpc_deps.bzl b/bazel/grpc_deps.bzl index dceeace99b6..527db4a0442 100644 --- a/bazel/grpc_deps.bzl +++ b/bazel/grpc_deps.bzl @@ -200,6 +200,11 @@ def grpc_deps(): actual = "@com_google_googleapis//google/monitoring/v3:monitoring_cc_grpc", ) + native.bind( + name = "googleapis_logging_grpc_service", + actual = "@com_google_googleapis//google/logging/v2:logging_cc_grpc", + ) + if "boringssl" not in native.existing_rules(): http_archive( name = "boringssl", diff --git a/src/cpp/ext/filters/logging/BUILD b/src/cpp/ext/filters/logging/BUILD index 2e961e7f286..ad2a9803560 100644 --- a/src/cpp/ext/filters/logging/BUILD +++ b/src/cpp/ext/filters/logging/BUILD @@ -35,6 +35,7 @@ grpc_cc_library( ], external_deps = [ "absl/strings", + "absl/time", ], language = "c++", visibility = [ diff --git a/src/cpp/ext/filters/logging/logging_sink.h b/src/cpp/ext/filters/logging/logging_sink.h index 66ecd60237b..067ba1308d5 100644 --- a/src/cpp/ext/filters/logging/logging_sink.h +++ b/src/cpp/ext/filters/logging/logging_sink.h @@ -23,7 +23,11 @@ #include +#include +#include + #include "absl/strings/string_view.h" +#include "absl/time/time.h" namespace grpc { namespace internal { @@ -52,9 +56,54 @@ class LoggingSink { uint32_t max_message_bytes_; }; + struct Entry { + enum class EventType { + kUnkown = 0, + kClientHeader, + kServerHeader, + kClientMessage, + kServerMessage, + kClientHalfClose, + kServerTrailer, + kCancel + }; + + enum class Logger { kUnkown = 0, kClient, kServer }; + + struct Payload { + std::map metadata; + absl::Duration duration; + uint32_t status_code; + std::string status_message; + std::string status_details; + uint32_t message_length; + std::string message; + }; + + struct Address { + enum class Type { kUnknown = 0, kIpv4, kIpv6, kUnix }; + Type type; + std::string address; + uint32_t ip_port; + }; + + uint64_t call_id; + uint64_t sequence_id; + EventType type; + Logger logger; + Payload payload; + bool payload_truncated; + Address peer; + std::string authority; + std::string service_name; + std::string method_name; + }; + virtual ~LoggingSink() = default; virtual Config FindMatch(bool is_client, absl::string_view path) = 0; + + virtual void LogEntry(Entry entry) = 0; }; } // namespace internal diff --git a/src/cpp/ext/gcp/BUILD b/src/cpp/ext/gcp/BUILD index f74a9703e07..0c28dd07a4c 100644 --- a/src/cpp/ext/gcp/BUILD +++ b/src/cpp/ext/gcp/BUILD @@ -101,6 +101,8 @@ grpc_cc_library( ], external_deps = [ "absl/strings", + "absl/types:optional", + "googleapis_logging_grpc_service", ], language = "c++", tags = ["nofixdeps"], @@ -108,6 +110,9 @@ grpc_cc_library( deps = [ ":observability_config", "//:gpr_platform", + "//:grpc++", + "//:grpc_opencensus_plugin", + "//src/core:env", "//src/cpp/ext/filters/logging:logging_sink", ], ) diff --git a/src/cpp/ext/gcp/observability_logging_sink.cc b/src/cpp/ext/gcp/observability_logging_sink.cc index 9570c881c68..999cb5003bb 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.cc +++ b/src/cpp/ext/gcp/observability_logging_sink.cc @@ -23,17 +23,49 @@ #include #include +#include + +#include "absl/types/optional.h" +#include "google/logging/v2/logging.grpc.pb.h" + +#include + +#include "src/core/lib/gprpp/env.h" +#include "src/cpp/ext/filters/census/open_census_call_tracer.h" namespace grpc { namespace internal { ObservabilityLoggingSink::ObservabilityLoggingSink( - GcpObservabilityConfig::CloudLogging logging_config) { + GcpObservabilityConfig::CloudLogging logging_config, std::string project_id) + : project_id_(std::move(project_id)) { for (auto& client_rpc_event_config : logging_config.client_rpc_events) { - client_configs.emplace_back(client_rpc_event_config); + client_configs_.emplace_back(client_rpc_event_config); } for (auto& server_rpc_event_config : logging_config.server_rpc_events) { - server_configs.emplace_back(server_rpc_event_config); + server_configs_.emplace_back(server_rpc_event_config); + } + std::string endpoint; + absl::optional endpoint_env = + grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); + if (endpoint_env.has_value() && !endpoint_env->empty()) { + endpoint = std::move(*endpoint_env); + } else { + endpoint = "logging.googleapis.com"; + } + ChannelArguments args; + // Disable observability for RPCs on this channel + args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); + // Set keepalive time to 24 hrs to effectively disable keepalive ping, but + // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect. + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */); + stub_ = google::logging::v2::LoggingServiceV2::NewStub( + CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args)); + absl::optional authority_env = + grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); + if (authority_env.has_value() && !authority_env->empty()) { + authority_ = std::move(*endpoint_env); } } @@ -48,7 +80,7 @@ LoggingSink::Config ObservabilityLoggingSink::FindMatch( path.substr(0, pos); // service name is before the '/' absl::string_view method = path.substr(pos + 1); // method name starts after the '/' - const auto& configs = is_client ? client_configs : server_configs; + const auto& configs = is_client ? client_configs_ : server_configs_; for (const auto& config : configs) { for (const auto& config_method : config.parsed_methods) { if ((config_method.service == "*") || @@ -66,6 +98,27 @@ LoggingSink::Config ObservabilityLoggingSink::FindMatch( return LoggingSink::Config(0, 0); } +void ObservabilityLoggingSink::LogEntry(Entry /* entry */) { + struct CallContext { + ClientContext context; + google::logging::v2::WriteLogEntriesRequest request; + google::logging::v2::WriteLogEntriesResponse response; + }; + // TODO(yashykt): Implement batching so that we can batch a bunch of log + // entries into a single entry. Also, set a reasonable deadline on the + // context, and actually use the entry. + CallContext* call = new CallContext; + call->context.set_authority(authority_); + stub_->async()->WriteLogEntries(&(call->context), &(call->request), + &(call->response), [call](Status status) { + if (!status.ok()) { + // TODO(yashykt): Log the contents of the + // request on a failure. + } + delete call; + }); +} + ObservabilityLoggingSink::Configuration::Configuration( const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration& rpc_event_config) diff --git a/src/cpp/ext/gcp/observability_logging_sink.h b/src/cpp/ext/gcp/observability_logging_sink.h index 4838cf98bdb..7ca9463474c 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.h +++ b/src/cpp/ext/gcp/observability_logging_sink.h @@ -23,10 +23,14 @@ #include +#include #include #include #include "absl/strings/string_view.h" +#include "google/logging/v2/logging.grpc.pb.h" + +#include #include "src/cpp/ext/filters/logging/logging_sink.h" #include "src/cpp/ext/gcp/observability_config.h" @@ -37,14 +41,16 @@ namespace internal { // Interface for a logging sink that will be used by the logging filter. class ObservabilityLoggingSink : public LoggingSink { public: - explicit ObservabilityLoggingSink( - GcpObservabilityConfig::CloudLogging logging_config); + ObservabilityLoggingSink(GcpObservabilityConfig::CloudLogging logging_config, + std::string project_id); ~ObservabilityLoggingSink() override = default; LoggingSink::Config FindMatch(bool is_client, absl::string_view path) override; + void LogEntry(Entry entry) override; + private: struct Configuration { explicit Configuration( @@ -60,8 +66,12 @@ class ObservabilityLoggingSink : public LoggingSink { uint32_t max_message_bytes = 0; }; - std::vector client_configs; - std::vector server_configs; + std::vector client_configs_; + std::vector server_configs_; + std::string project_id_; + std::shared_ptr channel_; + std::unique_ptr stub_; + std::string authority_; }; } // namespace internal diff --git a/test/cpp/ext/gcp/observability_logging_sink_test.cc b/test/cpp/ext/gcp/observability_logging_sink_test.cc index ae4517f23aa..98e2838538d 100644 --- a/test/cpp/ext/gcp/observability_logging_sink_test.cc +++ b/test/cpp/ext/gcp/observability_logging_sink_test.cc @@ -37,7 +37,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigEmpty) { auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0)); // server test @@ -62,7 +62,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigClientWildCardEntries) { auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(1024, 4096)); // server test @@ -87,7 +87,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigBadPath) { auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); EXPECT_EQ(sink.FindMatch(true, "foo"), LoggingSink::Config(0, 0)); } @@ -110,7 +110,7 @@ TEST(GcpObservabilityLoggingSinkTest, auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "service/bar"), LoggingSink::Config(1024, 4096)); @@ -139,7 +139,7 @@ TEST(GcpObservabilityLoggingSinkTest, auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(1024, 4096)); EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(1024, 4096)); @@ -171,7 +171,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigClientMultipleEventEntries) { auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(1024, 4096)); EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(512, 2048)); @@ -198,7 +198,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigServerWildCardEntries) { auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0)); // server test @@ -224,7 +224,7 @@ TEST(GcpObservabilityLoggingSinkTest, auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "service/bar"), LoggingSink::Config(0, 0)); EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0)); @@ -253,7 +253,7 @@ TEST(GcpObservabilityLoggingSinkTest, auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0)); EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(0, 0)); @@ -285,7 +285,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigServerMultipleEventEntries) { auto config = grpc_core::LoadFromJson( *json, grpc_core::JsonArgs(), &errors); ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors"); - ObservabilityLoggingSink sink(config.cloud_logging.value()); + ObservabilityLoggingSink sink(config.cloud_logging.value(), "test"); // client test EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0)); EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(0, 0));