Logging: Move filter to Core (#32467)

This filter was originally written only for the C++ wrapped layer, but
we have plans to use this for Python (and maybe other wrapped languages
too in the future.)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/23431/head
Yash Tibrewal 2 years ago committed by GitHub
parent b3fb266937
commit 7af4bc7f1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      src/core/BUILD
  2. 200
      src/core/ext/filters/logging/logging_filter.cc
  3. 15
      src/core/ext/filters/logging/logging_filter.h
  4. 16
      src/core/ext/filters/logging/logging_sink.h
  5. 92
      src/cpp/ext/filters/logging/BUILD
  6. 4
      src/cpp/ext/gcp/BUILD
  7. 4
      src/cpp/ext/gcp/observability.cc
  8. 2
      src/cpp/ext/gcp/observability_logging_sink.cc
  9. 6
      src/cpp/ext/gcp/observability_logging_sink.h
  10. 2
      test/cpp/ext/filters/logging/BUILD
  11. 25
      test/cpp/ext/filters/logging/logging_test.cc
  12. 2
      test/cpp/ext/gcp/observability_logging_sink_test.cc
  13. 1
      tools/distrib/fix_build_deps.py

@ -5350,6 +5350,69 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "logging_sink",
hdrs = [
"ext/filters/logging/logging_sink.h",
],
external_deps = ["absl/strings"],
language = "c++",
visibility = [
"//src/cpp/ext/gcp:__subpackages__",
"//test:__subpackages__",
],
deps = [
"time",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "logging_filter",
srcs = [
"ext/filters/logging/logging_filter.cc",
],
hdrs = [
"ext/filters/logging/logging_filter.h",
],
external_deps = [
"absl/random",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
visibility = [
"//src/cpp/ext/gcp:__subpackages__",
"//test:__subpackages__",
],
deps = [
"arena",
"arena_promise",
"cancel_callback",
"channel_args",
"channel_fwd",
"channel_stack_type",
"context",
"logging_sink",
"map",
"pipe",
"poll",
"slice",
"slice_buffer",
"time",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_public_hdrs",
"//:grpc_resolver",
"//:uri_parser",
],
)
### UPB Targets ### UPB Targets
grpc_upb_proto_library( grpc_upb_proto_library(

@ -18,7 +18,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/logging/logging_filter.h" #include "src/core/ext/filters/logging/logging_filter.h"
#include <inttypes.h> #include <inttypes.h>
#include <limits.h> #include <limits.h>
@ -48,6 +48,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/logging/logging_sink.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
@ -70,10 +71,8 @@
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
#include "src/cpp/ext/filters/logging/logging_sink.h"
namespace grpc { namespace grpc_core {
namespace internal {
namespace { namespace {
@ -92,8 +91,7 @@ class MetadataEncoder {
status_details_bin_(status_details_bin), status_details_bin_(status_details_bin),
log_len_(log_len) {} log_len_(log_len) {}
void Encode(const grpc_core::Slice& key_slice, void Encode(const Slice& key_slice, const Slice& value_slice) {
const grpc_core::Slice& value_slice) {
auto key = key_slice.as_string_view(); auto key = key_slice.as_string_view();
auto value = value_slice.as_string_view(); auto value = value_slice.as_string_view();
if (status_details_bin_ != nullptr && key == "grpc-status-details-bin") { if (status_details_bin_ != nullptr && key == "grpc-status-details-bin") {
@ -122,12 +120,11 @@ class MetadataEncoder {
template <typename Which> template <typename Which>
void Encode(Which, const typename Which::ValueType&) {} void Encode(Which, const typename Which::ValueType&) {}
void Encode(grpc_core::GrpcStatusMetadata, grpc_status_code status) { void Encode(GrpcStatusMetadata, grpc_status_code status) {
payload_->status_code = status; payload_->status_code = status;
} }
void Encode(grpc_core::GrpcMessageMetadata, void Encode(GrpcMessageMetadata, const Slice& status_message) {
const grpc_core::Slice& status_message) {
payload_->status_message = std::string(status_message.as_string_view()); payload_->status_message = std::string(status_message.as_string_view());
} }
@ -143,8 +140,8 @@ class MetadataEncoder {
void SetIpPort(absl::string_view s, LoggingSink::Entry::Address* peer) { void SetIpPort(absl::string_view s, LoggingSink::Entry::Address* peer) {
absl::string_view host; absl::string_view host;
absl::string_view port; absl::string_view port;
if (grpc_core::SplitHostPort(absl::string_view(s.data(), s.length()), &host, if (SplitHostPort(absl::string_view(s.data(), s.length()), &host, &port) ==
&port) == 1) { 1) {
if (!host.empty()) { if (!host.empty()) {
peer->address = std::string(host); peer->address = std::string(host);
} }
@ -157,11 +154,9 @@ void SetIpPort(absl::string_view s, LoggingSink::Entry::Address* peer) {
} }
} }
LoggingSink::Entry::Address PeerStringToAddress( LoggingSink::Entry::Address PeerStringToAddress(const Slice& peer_string) {
const grpc_core::Slice& peer_string) {
LoggingSink::Entry::Address address; LoggingSink::Entry::Address address;
absl::StatusOr<grpc_core::URI> uri = absl::StatusOr<URI> uri = URI::Parse(peer_string.as_string_view());
grpc_core::URI::Parse(peer_string.as_string_view());
if (!uri.ok()) { if (!uri.ok()) {
gpr_log(GPR_DEBUG, "peer_string is in invalid format and cannot be logged"); gpr_log(GPR_DEBUG, "peer_string is in invalid format and cannot be logged");
return address; return address;
@ -181,8 +176,8 @@ LoggingSink::Entry::Address PeerStringToAddress(
return address; return address;
} }
void EncodeMessageToPayload(const grpc_core::SliceBuffer* message, void EncodeMessageToPayload(const SliceBuffer* message, uint32_t log_len,
uint32_t log_len, LoggingSink::Entry* entry) { LoggingSink::Entry* entry) {
auto* sb = message->c_slice_buffer(); auto* sb = message->c_slice_buffer();
entry->payload.message_length = sb->length; entry->payload.message_length = sb->length;
// Log the message to a max of the configured message length // Log the message to a max of the configured message length
@ -203,12 +198,12 @@ void EncodeMessageToPayload(const grpc_core::SliceBuffer* message,
class CallData { class CallData {
public: public:
CallData(bool is_client, const grpc_core::CallArgs& call_args, CallData(bool is_client, const CallArgs& call_args,
const std::string& authority) const std::string& authority)
: call_id_(GetCallId()) { : call_id_(GetCallId()) {
absl::string_view path; absl::string_view path;
if (auto* value = call_args.client_initial_metadata->get_pointer( if (auto* value = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata())) { HttpPathMetadata())) {
path = value->as_string_view(); path = value->as_string_view();
} }
std::vector<std::string> parts = std::vector<std::string> parts =
@ -220,7 +215,7 @@ class CallData {
config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_); config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_);
if (config_.ShouldLog()) { if (config_.ShouldLog()) {
if (auto* value = call_args.client_initial_metadata->get_pointer( if (auto* value = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpAuthorityMetadata())) { HttpAuthorityMetadata())) {
authority_ = std::string(value->as_string_view()); authority_ = std::string(value->as_string_view());
} else { } else {
authority_ = authority; authority_ = authority;
@ -230,8 +225,7 @@ class CallData {
bool ShouldLog() { return config_.ShouldLog(); } bool ShouldLog() { return config_.ShouldLog(); }
void LogClientHeader(bool is_client, void LogClientHeader(bool is_client, const ClientMetadataHandle& metadata) {
const grpc_core::ClientMetadataHandle& metadata) {
LoggingSink::Entry entry; LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kClientHeader); LoggingSink::Entry::EventType::kClientHeader);
@ -240,7 +234,7 @@ class CallData {
metadata->Encode(&encoder); metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated(); entry.payload_truncated = encoder.truncated();
if (!is_client) { if (!is_client) {
if (auto* value = metadata->get_pointer(grpc_core::PeerString())) { if (auto* value = metadata->get_pointer(PeerString())) {
peer_ = PeerStringToAddress(*value); peer_ = PeerStringToAddress(*value);
} }
} }
@ -254,8 +248,7 @@ class CallData {
g_logging_sink->LogEntry(std::move(entry)); g_logging_sink->LogEntry(std::move(entry));
} }
void LogServerHeader(bool is_client, void LogServerHeader(bool is_client, const ServerMetadata* metadata) {
const grpc_core::ServerMetadata* metadata) {
LoggingSink::Entry entry; LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kServerHeader); LoggingSink::Entry::EventType::kServerHeader);
@ -265,7 +258,7 @@ class CallData {
metadata->Encode(&encoder); metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated(); entry.payload_truncated = encoder.truncated();
if (is_client) { if (is_client) {
if (auto* value = metadata->get_pointer(grpc_core::PeerString())) { if (auto* value = metadata->get_pointer(PeerString())) {
peer_ = PeerStringToAddress(*value); peer_ = PeerStringToAddress(*value);
} }
} }
@ -273,8 +266,7 @@ class CallData {
g_logging_sink->LogEntry(std::move(entry)); g_logging_sink->LogEntry(std::move(entry));
} }
void LogServerTrailer(bool is_client, void LogServerTrailer(bool is_client, const ServerMetadata* metadata) {
const grpc_core::ServerMetadata* metadata) {
LoggingSink::Entry entry; LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kServerTrailer); LoggingSink::Entry::EventType::kServerTrailer);
@ -287,7 +279,7 @@ class CallData {
g_logging_sink->LogEntry(std::move(entry)); g_logging_sink->LogEntry(std::move(entry));
} }
void LogClientMessage(bool is_client, const grpc_core::SliceBuffer* message) { void LogClientMessage(bool is_client, const SliceBuffer* message) {
LoggingSink::Entry entry; LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kClientMessage); LoggingSink::Entry::EventType::kClientMessage);
@ -295,7 +287,7 @@ class CallData {
g_logging_sink->LogEntry(std::move(entry)); g_logging_sink->LogEntry(std::move(entry));
} }
void LogServerMessage(bool is_client, const grpc_core::SliceBuffer* message) { void LogServerMessage(bool is_client, const SliceBuffer* message) {
LoggingSink::Entry entry; LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kServerMessage); LoggingSink::Entry::EventType::kServerMessage);
@ -322,7 +314,7 @@ class CallData {
entry->peer = peer_; entry->peer = peer_;
entry->service_name = service_name_; entry->service_name = service_name_;
entry->method_name = method_name_; entry->method_name = method_name_;
entry->timestamp = grpc_core::Timestamp::Now(); entry->timestamp = Timestamp::Now();
} }
uint64_t call_id_; uint64_t call_id_;
uint32_t sequence_id_ = 0; uint32_t sequence_id_ = 0;
@ -333,12 +325,12 @@ class CallData {
LoggingSink::Config config_; LoggingSink::Config config_;
}; };
class ClientLoggingFilter final : public grpc_core::ChannelFilter { class ClientLoggingFilter final : public ChannelFilter {
public: public:
static const grpc_channel_filter kFilter; static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientLoggingFilter> Create( static absl::StatusOr<ClientLoggingFilter> Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) { const ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
absl::optional<absl::string_view> default_authority = absl::optional<absl::string_view> default_authority =
args.GetString(GRPC_ARG_DEFAULT_AUTHORITY); args.GetString(GRPC_ARG_DEFAULT_AUTHORITY);
if (default_authority.has_value()) { if (default_authority.has_value()) {
@ -347,48 +339,45 @@ class ClientLoggingFilter final : public grpc_core::ChannelFilter {
absl::optional<std::string> server_uri = absl::optional<std::string> server_uri =
args.GetOwnedString(GRPC_ARG_SERVER_URI); args.GetOwnedString(GRPC_ARG_SERVER_URI);
if (server_uri.has_value()) { if (server_uri.has_value()) {
return ClientLoggingFilter(grpc_core::CoreConfiguration::Get() return ClientLoggingFilter(
.resolver_registry() CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
.GetDefaultAuthority(*server_uri)); *server_uri));
} }
return ClientLoggingFilter(""); return ClientLoggingFilter("");
} }
// Construct a promise for one call. // Construct a promise for one call.
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise( ArenaPromise<ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args, CallArgs call_args, NextPromiseFactory next_promise_factory) override {
grpc_core::NextPromiseFactory next_promise_factory) override { CallData* calld = GetContext<Arena>()->ManagedNew<CallData>(
CallData* calld = true, call_args, default_authority_);
grpc_core::GetContext<grpc_core::Arena>()->ManagedNew<CallData>(
true, call_args, default_authority_);
if (!calld->ShouldLog()) { if (!calld->ShouldLog()) {
return next_promise_factory(std::move(call_args)); return next_promise_factory(std::move(call_args));
} }
calld->LogClientHeader(/*is_client=*/true, calld->LogClientHeader(/*is_client=*/true,
call_args.client_initial_metadata); call_args.client_initial_metadata);
call_args.server_initial_metadata->InterceptAndMap( call_args.server_initial_metadata->InterceptAndMap(
[calld](grpc_core::ServerMetadataHandle metadata) { [calld](ServerMetadataHandle metadata) {
calld->LogServerHeader(/*is_client=*/true, metadata.get()); calld->LogServerHeader(/*is_client=*/true, metadata.get());
return metadata; return metadata;
}); });
call_args.client_to_server_messages->InterceptAndMapWithHalfClose( call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
[calld](grpc_core::MessageHandle message) { [calld](MessageHandle message) {
calld->LogClientMessage(/*is_client=*/true, message->payload()); calld->LogClientMessage(/*is_client=*/true, message->payload());
return message; return message;
}, },
[calld] { calld->LogClientHalfClose(/*is_client=*/true); }); [calld] { calld->LogClientHalfClose(/*is_client=*/true); });
call_args.server_to_client_messages->InterceptAndMap( call_args.server_to_client_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) { [calld](MessageHandle message) {
calld->LogServerMessage(/*is_client=*/true, message->payload()); calld->LogServerMessage(/*is_client=*/true, message->payload());
return message; return message;
}); });
return grpc_core::OnCancel( return OnCancel(Map(next_promise_factory(std::move(call_args)),
Map(next_promise_factory(std::move(call_args)), [calld](ServerMetadataHandle md) {
[calld](grpc_core::ServerMetadataHandle md) { calld->LogServerTrailer(/*is_client=*/true, md.get());
calld->LogServerTrailer(/*is_client=*/true, md.get()); return md;
return md; }),
}), [calld]() { calld->LogCancel(/*is_client=*/true); });
[calld]() { calld->LogCancel(/*is_client=*/true); });
} }
private: private:
@ -398,99 +387,90 @@ class ClientLoggingFilter final : public grpc_core::ChannelFilter {
}; };
const grpc_channel_filter ClientLoggingFilter::kFilter = const grpc_channel_filter ClientLoggingFilter::kFilter =
grpc_core::MakePromiseBasedFilter< MakePromiseBasedFilter<ClientLoggingFilter, FilterEndpoint::kClient,
ClientLoggingFilter, grpc_core::FilterEndpoint::kClient, kFilterExaminesServerInitialMetadata |
grpc_core::kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages |
grpc_core::kFilterExaminesInboundMessages | kFilterExaminesOutboundMessages>("logging");
grpc_core::kFilterExaminesOutboundMessages>("logging");
class ServerLoggingFilter final : public grpc_core::ChannelFilter { class ServerLoggingFilter final : public ChannelFilter {
public: public:
static const grpc_channel_filter kFilter; static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerLoggingFilter> Create( static absl::StatusOr<ServerLoggingFilter> Create(
const grpc_core::ChannelArgs& /*args*/, const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/) {
ChannelFilter::Args /*filter_args*/) {
return ServerLoggingFilter(); return ServerLoggingFilter();
} }
// Construct a promise for one call. // Construct a promise for one call.
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise( ArenaPromise<ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args, CallArgs call_args, NextPromiseFactory next_promise_factory) override {
grpc_core::NextPromiseFactory next_promise_factory) override { CallData* calld = GetContext<Arena>()->ManagedNew<CallData>(
CallData* calld = false, call_args, /*default_authority=*/"");
grpc_core::GetContext<grpc_core::Arena>()->ManagedNew<CallData>(
false, call_args, /*default_authority=*/"");
if (!calld->ShouldLog()) { if (!calld->ShouldLog()) {
return next_promise_factory(std::move(call_args)); return next_promise_factory(std::move(call_args));
} }
calld->LogClientHeader(/*is_client=*/false, calld->LogClientHeader(/*is_client=*/false,
call_args.client_initial_metadata); call_args.client_initial_metadata);
call_args.server_initial_metadata->InterceptAndMap( call_args.server_initial_metadata->InterceptAndMap(
[calld](grpc_core::ServerMetadataHandle metadata) { [calld](ServerMetadataHandle metadata) {
calld->LogServerHeader(/*is_client=*/false, metadata.get()); calld->LogServerHeader(/*is_client=*/false, metadata.get());
return metadata; return metadata;
}); });
call_args.client_to_server_messages->InterceptAndMapWithHalfClose( call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
[calld](grpc_core::MessageHandle message) { [calld](MessageHandle message) {
calld->LogClientMessage(/*is_client=*/false, message->payload()); calld->LogClientMessage(/*is_client=*/false, message->payload());
return message; return message;
}, },
[calld] { calld->LogClientHalfClose(/*is_client=*/false); }); [calld] { calld->LogClientHalfClose(/*is_client=*/false); });
call_args.server_to_client_messages->InterceptAndMap( call_args.server_to_client_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) { [calld](MessageHandle message) {
calld->LogServerMessage(/*is_client=*/false, message->payload()); calld->LogServerMessage(/*is_client=*/false, message->payload());
return message; return message;
}); });
return grpc_core::OnCancel( return OnCancel(Map(next_promise_factory(std::move(call_args)),
Map(next_promise_factory(std::move(call_args)), [calld](ServerMetadataHandle md) {
[calld](grpc_core::ServerMetadataHandle md) { calld->LogServerTrailer(/*is_client=*/false,
calld->LogServerTrailer(/*is_client=*/false, md.get()); md.get());
return md; return md;
}), }),
[calld]() { calld->LogCancel(/*is_client=*/false); }); [calld]() { calld->LogCancel(/*is_client=*/false); });
} }
}; };
const grpc_channel_filter ServerLoggingFilter::kFilter = const grpc_channel_filter ServerLoggingFilter::kFilter =
grpc_core::MakePromiseBasedFilter< MakePromiseBasedFilter<ServerLoggingFilter, FilterEndpoint::kServer,
ServerLoggingFilter, grpc_core::FilterEndpoint::kServer, kFilterExaminesServerInitialMetadata |
grpc_core::kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages |
grpc_core::kFilterExaminesInboundMessages | kFilterExaminesOutboundMessages>("logging");
grpc_core::kFilterExaminesOutboundMessages>("logging");
} // namespace } // namespace
void RegisterLoggingFilter(LoggingSink* sink) { void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink; g_logging_sink = sink;
grpc_core::CoreConfiguration::RegisterBuilder( CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
[](grpc_core::CoreConfiguration::Builder* builder) { builder->channel_init()->RegisterStage(
builder->channel_init()->RegisterStage( GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
GRPC_SERVER_CHANNEL, INT_MAX, // TODO(yashykt) : Figure out a good place to place this channel
[](grpc_core::ChannelStackBuilder* builder) { // arg
// TODO(yashykt) : Figure out a good place to place this channel if (builder->channel_args()
// arg .GetInt("grpc.experimental.enable_observability")
if (builder->channel_args() .value_or(true)) {
.GetInt("grpc.experimental.enable_observability") builder->PrependFilter(&ServerLoggingFilter::kFilter);
.value_or(true)) { }
builder->PrependFilter(&ServerLoggingFilter::kFilter); return true;
} });
return true; builder->channel_init()->RegisterStage(
}); GRPC_CLIENT_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
builder->channel_init()->RegisterStage( // TODO(yashykt) : Figure out a good place to place this channel
GRPC_CLIENT_CHANNEL, INT_MAX, // arg
[](grpc_core::ChannelStackBuilder* builder) { if (builder->channel_args()
// TODO(yashykt) : Figure out a good place to place this channel .GetInt("grpc.experimental.enable_observability")
// arg .value_or(true)) {
if (builder->channel_args() builder->PrependFilter(&ClientLoggingFilter::kFilter);
.GetInt("grpc.experimental.enable_observability") }
.value_or(true)) { return true;
builder->PrependFilter(&ClientLoggingFilter::kFilter); });
} });
return true;
});
});
} }
} // namespace internal } // namespace grpc_core
} // namespace grpc

@ -16,20 +16,17 @@
// //
// //
#ifndef GRPC_SRC_CPP_EXT_FILTERS_LOGGING_LOGGING_FILTER_H #ifndef GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_FILTER_H
#define GRPC_SRC_CPP_EXT_FILTERS_LOGGING_LOGGING_FILTER_H #define GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_FILTER_H
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/logging/logging_sink.h" #include "src/core/ext/filters/logging/logging_sink.h"
namespace grpc { namespace grpc_core {
namespace internal {
// TODO(yashykt): Add logging sink registration
void RegisterLoggingFilter(LoggingSink* sink); void RegisterLoggingFilter(LoggingSink* sink);
} // namespace internal } // namespace grpc_core
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_FILTERS_LOGGING_LOGGING_FILTER_H #endif // GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_FILTER_H

@ -16,8 +16,8 @@
// //
// //
#ifndef GRPC_SRC_CPP_EXT_FILTERS_LOGGING_LOGGING_SINK_H #ifndef GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_SINK_H
#define GRPC_SRC_CPP_EXT_FILTERS_LOGGING_LOGGING_SINK_H #define GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_SINK_H
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
@ -30,8 +30,7 @@
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
namespace grpc { namespace grpc_core {
namespace internal {
// Interface for a logging sink that will be used by the logging filter. // Interface for a logging sink that will be used by the logging filter.
class LoggingSink { class LoggingSink {
@ -77,7 +76,7 @@ class LoggingSink {
struct Payload { struct Payload {
std::map<std::string, std::string> metadata; std::map<std::string, std::string> metadata;
grpc_core::Duration timeout; Duration timeout;
uint32_t status_code = 0; uint32_t status_code = 0;
std::string status_message; std::string status_message;
std::string status_details; std::string status_details;
@ -102,7 +101,7 @@ class LoggingSink {
std::string authority; std::string authority;
std::string service_name; std::string service_name;
std::string method_name; std::string method_name;
grpc_core::Timestamp timestamp; Timestamp timestamp;
}; };
virtual ~LoggingSink() = default; virtual ~LoggingSink() = default;
@ -113,7 +112,6 @@ class LoggingSink {
virtual void LogEntry(Entry entry) = 0; virtual void LogEntry(Entry entry) = 0;
}; };
} // namespace internal } // namespace grpc_core
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_FILTERS_LOGGING_LOGGING_SINK_H #endif // GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_SINK_H

@ -1,92 +0,0 @@
# gRPC Bazel BUILD file.
#
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
)
licenses(["reciprocal"])
package(
default_visibility = ["//visibility:public"],
features = [
"layering_check",
],
)
grpc_cc_library(
name = "logging_sink",
hdrs = [
"logging_sink.h",
],
external_deps = ["absl/strings"],
language = "c++",
visibility = [
"//src/cpp/ext/gcp:__subpackages__",
"//test:__subpackages__",
],
deps = [
"//:gpr_platform",
"//src/core:time",
],
)
grpc_cc_library(
name = "logging_filter",
srcs = [
"logging_filter.cc",
],
hdrs = [
"logging_filter.h",
],
external_deps = [
"absl/random",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
visibility = [
"//src/cpp/ext/gcp:__subpackages__",
"//test:__subpackages__",
],
deps = [
"logging_sink",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_public_hdrs",
"//:grpc_resolver",
"//:uri_parser",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:map",
"//src/core:pipe",
"//src/core:poll",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:time",
],
)

@ -58,7 +58,7 @@ grpc_cc_library(
"//:gpr", "//:gpr",
"//:grpc++", "//:grpc++",
"//:grpc_opencensus_plugin", "//:grpc_opencensus_plugin",
"//src/cpp/ext/filters/logging:logging_filter", "//src/core:logging_filter",
], ],
) )
@ -125,8 +125,8 @@ grpc_cc_library(
"//src/core:default_event_engine", "//src/core:default_event_engine",
"//src/core:env", "//src/core:env",
"//src/core:json", "//src/core:json",
"//src/core:logging_sink",
"//src/core:time", "//src/core:time",
"//src/cpp/ext/filters/logging:logging_sink",
], ],
) )

@ -43,9 +43,9 @@
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h> #include <grpcpp/support/channel_arguments.h>
#include "src/core/ext/filters/logging/logging_filter.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h" #include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h" #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
#include "src/cpp/ext/filters/logging/logging_filter.h"
#include "src/cpp/ext/gcp/environment_autodetect.h" #include "src/cpp/ext/gcp/environment_autodetect.h"
#include "src/cpp/ext/gcp/observability_config.h" #include "src/cpp/ext/gcp/observability_config.h"
#include "src/cpp/ext/gcp/observability_logging_sink.h" #include "src/cpp/ext/gcp/observability_logging_sink.h"
@ -172,7 +172,7 @@ absl::Status GcpObservabilityInit() {
if (config->cloud_logging.has_value()) { if (config->cloud_logging.has_value()) {
g_logging_sink = new grpc::internal::ObservabilityLoggingSink( g_logging_sink = new grpc::internal::ObservabilityLoggingSink(
config->cloud_logging.value(), config->project_id, config->labels); config->cloud_logging.value(), config->project_id, config->labels);
grpc::internal::RegisterLoggingFilter(g_logging_sink); grpc_core::RegisterLoggingFilter(g_logging_sink);
} }
return absl::OkStatus(); return absl::OkStatus();
} }

@ -54,6 +54,8 @@
namespace grpc { namespace grpc {
namespace internal { namespace internal {
using grpc_core::LoggingSink;
ObservabilityLoggingSink::ObservabilityLoggingSink( ObservabilityLoggingSink::ObservabilityLoggingSink(
GcpObservabilityConfig::CloudLogging logging_config, std::string project_id, GcpObservabilityConfig::CloudLogging logging_config, std::string project_id,
std::map<std::string, std::string> labels) std::map<std::string, std::string> labels)

@ -37,8 +37,8 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include "src/core/ext/filters/logging/logging_sink.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/cpp/ext/filters/logging/logging_sink.h"
#include "src/cpp/ext/gcp/environment_autodetect.h" #include "src/cpp/ext/gcp/environment_autodetect.h"
#include "src/cpp/ext/gcp/observability_config.h" #include "src/cpp/ext/gcp/observability_config.h"
@ -46,7 +46,7 @@ namespace grpc {
namespace internal { namespace internal {
// Interface for a logging sink that will be used by the logging filter. // Interface for a logging sink that will be used by the logging filter.
class ObservabilityLoggingSink : public LoggingSink { class ObservabilityLoggingSink : public grpc_core::LoggingSink {
public: public:
ObservabilityLoggingSink(GcpObservabilityConfig::CloudLogging logging_config, ObservabilityLoggingSink(GcpObservabilityConfig::CloudLogging logging_config,
std::string project_id, std::string project_id,
@ -115,7 +115,7 @@ class ObservabilityLoggingSink : public LoggingSink {
}; };
// Exposed for just for testing purposes // Exposed for just for testing purposes
void EntryToJsonStructProto(LoggingSink::Entry entry, void EntryToJsonStructProto(grpc_core::LoggingSink::Entry entry,
::google::protobuf::Struct* json_payload); ::google::protobuf::Struct* json_payload);
} // namespace internal } // namespace internal

@ -34,7 +34,7 @@ grpc_cc_test(
], ],
deps = [ deps = [
"//:grpc++", "//:grpc++",
"//src/cpp/ext/filters/logging:logging_filter", "//src/core:logging_filter",
"//src/cpp/ext/gcp:observability_logging_sink", "//src/cpp/ext/gcp:observability_logging_sink",
"//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util", "//test/core/util:grpc_test_util",

@ -28,8 +28,8 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include <grpcpp/support/status.h> #include <grpcpp/support/status.h>
#include "src/core/ext/filters/logging/logging_filter.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/cpp/ext/filters/logging/logging_filter.h"
#include "src/cpp/ext/gcp/observability_logging_sink.h" #include "src/cpp/ext/gcp/observability_logging_sink.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "src/proto/grpc/testing/echo_messages.pb.h" #include "src/proto/grpc/testing/echo_messages.pb.h"
@ -42,7 +42,7 @@ namespace testing {
namespace { namespace {
using grpc::internal::LoggingSink; using grpc_core::LoggingSink;
using ::testing::AllOf; using ::testing::AllOf;
using ::testing::Eq; using ::testing::Eq;
@ -60,7 +60,7 @@ class MyTestServiceImpl : public TestServiceImpl {
} }
}; };
class TestLoggingSink : public grpc::internal::LoggingSink { class TestLoggingSink : public LoggingSink {
public: public:
Config FindMatch(bool /* is_client */, absl::string_view /* service */, Config FindMatch(bool /* is_client */, absl::string_view /* service */,
absl::string_view /* method */) override { absl::string_view /* method */) override {
@ -105,7 +105,7 @@ class LoggingTest : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { static void SetUpTestSuite() {
g_test_logging_sink = new TestLoggingSink; g_test_logging_sink = new TestLoggingSink;
grpc::internal::RegisterLoggingFilter(g_test_logging_sink); grpc_core::RegisterLoggingFilter(g_test_logging_sink);
} }
void SetUp() override { void SetUp() override {
@ -149,8 +149,7 @@ class LoggingTest : public ::testing::Test {
}; };
TEST_F(LoggingTest, SimpleRpc) { TEST_F(LoggingTest, SimpleRpc) {
g_test_logging_sink->SetConfig( g_test_logging_sink->SetConfig(LoggingSink::Config(4096, 4096));
grpc::internal::LoggingSink::Config(4096, 4096));
EchoRequest request; EchoRequest request;
request.set_message("foo"); request.set_message("foo");
EchoResponse response; EchoResponse response;
@ -306,7 +305,7 @@ TEST_F(LoggingTest, SimpleRpc) {
} }
TEST_F(LoggingTest, LoggingDisabled) { TEST_F(LoggingTest, LoggingDisabled) {
g_test_logging_sink->SetConfig(grpc::internal::LoggingSink::Config()); g_test_logging_sink->SetConfig(LoggingSink::Config());
EchoRequest request; EchoRequest request;
request.set_message("foo"); request.set_message("foo");
EchoResponse response; EchoResponse response;
@ -318,8 +317,8 @@ TEST_F(LoggingTest, LoggingDisabled) {
} }
TEST_F(LoggingTest, MetadataTruncated) { TEST_F(LoggingTest, MetadataTruncated) {
g_test_logging_sink->SetConfig(grpc::internal::LoggingSink::Config( g_test_logging_sink->SetConfig(
40 /* expect truncated metadata*/, 4096)); LoggingSink::Config(40 /* expect truncated metadata*/, 4096));
EchoRequest request; EchoRequest request;
request.set_message("foo"); request.set_message("foo");
EchoResponse response; EchoResponse response;
@ -476,7 +475,7 @@ TEST_F(LoggingTest, MetadataTruncated) {
} }
TEST_F(LoggingTest, PayloadTruncated) { TEST_F(LoggingTest, PayloadTruncated) {
g_test_logging_sink->SetConfig(grpc::internal::LoggingSink::Config(4096, 10)); g_test_logging_sink->SetConfig(LoggingSink::Config(4096, 10));
EchoRequest request; EchoRequest request;
// The following message should get truncated // The following message should get truncated
request.set_message("Hello World"); request.set_message("Hello World");
@ -637,8 +636,7 @@ TEST_F(LoggingTest, PayloadTruncated) {
} }
TEST_F(LoggingTest, CancelledRpc) { TEST_F(LoggingTest, CancelledRpc) {
g_test_logging_sink->SetConfig( g_test_logging_sink->SetConfig(LoggingSink::Config(4096, 4096));
grpc::internal::LoggingSink::Config(4096, 4096));
EchoRequest request; EchoRequest request;
request.set_message("foo"); request.set_message("foo");
const int kCancelDelayUs = 10 * 1000; const int kCancelDelayUs = 10 * 1000;
@ -679,8 +677,7 @@ TEST_F(LoggingTest, CancelledRpc) {
} }
TEST_F(LoggingTest, ServerCancelsRpc) { TEST_F(LoggingTest, ServerCancelsRpc) {
g_test_logging_sink->SetConfig( g_test_logging_sink->SetConfig(LoggingSink::Config(4096, 4096));
grpc::internal::LoggingSink::Config(4096, 4096));
EchoRequest request; EchoRequest request;
request.set_message("foo"); request.set_message("foo");
auto* error = request.mutable_param()->mutable_expected_error(); auto* error = request.mutable_param()->mutable_expected_error();

@ -29,6 +29,8 @@ namespace internal {
namespace { namespace {
using grpc_core::LoggingSink;
TEST(GcpObservabilityLoggingSinkTest, LoggingConfigEmpty) { TEST(GcpObservabilityLoggingSinkTest, LoggingConfigEmpty) {
const char* json_str = R"json({ const char* json_str = R"json({
"cloud_logging": { "cloud_logging": {

@ -425,7 +425,6 @@ for dirname in [
"", "",
"src/core", "src/core",
"src/cpp/ext/gcp", "src/cpp/ext/gcp",
"src/cpp/ext/filters/logging",
"test/core/backoff", "test/core/backoff",
"test/core/uri", "test/core/uri",
"test/core/util", "test/core/util",

Loading…
Cancel
Save