Logging filter (#31755)

* initial

* Intermediate

* Another try

* Try multiple necesary pulls

* Filter works other than client half close

* Fixes

* Add a cancelled RPC test

* Handle trailer only responses

* Tests for disabled logging and truncated payloads

* Fix authority and peer

* Add TODOs and asserts for half-close

* Fix tests for half-close and cancel

* 2d748fcb1cf45cac62729b8346ad15e6abc79e97

* Fix sanity checks

* Strict bazel build

* Fix package

* IWYU

* Fix cmake

* Explicit cast to string

* Size casts

* Fix Arena leak and disable macos build for now

* Reviewer comments
pull/32039/head
Yash Tibrewal 2 years ago committed by GitHub
parent 8d5e0a71a6
commit ad6f6c49e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/lib/channel/promise_based_filter.cc
  2. 49
      src/cpp/ext/filters/logging/BUILD
  3. 540
      src/cpp/ext/filters/logging/logging_filter.cc
  4. 35
      src/cpp/ext/filters/logging/logging_filter.h
  5. 27
      src/cpp/ext/filters/logging/logging_sink.h
  6. 28
      src/cpp/ext/gcp/observability_logging_sink.cc
  7. 4
      src/cpp/ext/gcp/observability_logging_sink.h
  8. 45
      test/cpp/ext/filters/logging/BUILD
  9. 688
      test/cpp/ext/filters/logging/logging_test.cc
  10. 3
      test/cpp/ext/gcp/BUILD
  11. 71
      test/cpp/ext/gcp/observability_logging_sink_test.cc
  12. 7
      tools/buildgen/extract_metadata_from_bazel_xml.py

@ -1912,11 +1912,12 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
wake = true;
}
if (send_message() != nullptr && batch->send_message) {
if (send_message() != nullptr && batch.is_captured() && batch->send_message) {
send_message()->StartOp(batch);
wake = true;
}
if (receive_message() != nullptr && batch->recv_message) {
if (receive_message() != nullptr && batch.is_captured() &&
batch->recv_message) {
receive_message()->StartOp(batch);
wake = true;
}

@ -44,3 +44,52 @@ grpc_cc_library(
"//src/core:time",
],
)
grpc_cc_library(
name = "logging_filter",
srcs = [
"logging_filter.cc",
],
hdrs = [
"logging_filter.h",
],
external_deps = [
"absl/random",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
visibility = [
"//src/cpp/ext/gcp:__subpackages__",
"//test:__subpackages__",
],
deps = [
"logging_sink",
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_resolver",
"//:promise",
"//:uri_parser",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:basic_seq",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:for_each",
"//src/core:latch",
"//src/core:promise_like",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:try_concurrently",
],
)

@ -0,0 +1,540 @@
//
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/logging/logging_filter.h"
#include <limits.h>
#include <stdint.h>
#include <algorithm>
#include <cstddef>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/map_pipe.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_concurrently.h"
#include "src/core/lib/resolver/resolver_registry.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/cpp/ext/filters/logging/logging_sink.h"
namespace grpc {
namespace internal {
namespace {
LoggingSink* g_logging_sink = nullptr;
uint64_t GetCallId() {
thread_local absl::InsecureBitGen gen;
return absl::Uniform(gen, 0u, std::numeric_limits<uint64_t>::max());
}
class LoggingFilter : public grpc_core::ChannelFilter {
protected:
explicit LoggingFilter() {}
private:
};
class MetadataEncoder {
public:
MetadataEncoder(LoggingSink::Entry::Payload* payload,
absl::string_view* status_details_bin, uint64_t log_len)
: payload_(payload),
status_details_bin_(status_details_bin),
log_len_(log_len) {}
void Encode(const grpc_core::Slice& key_slice,
const grpc_core::Slice& value_slice) {
auto key = key_slice.as_string_view();
auto value = value_slice.as_string_view();
if (status_details_bin_ != nullptr && key == "grpc-status-details-bin") {
*status_details_bin_ = value;
return;
}
if (absl::ConsumePrefix(&key, "grpc-")) {
// skip all other grpc- headers
return;
}
uint64_t mdentry_len = key.length() + value.length();
if (mdentry_len > log_len_) {
gpr_log(GPR_DEBUG,
"Skipped metadata key because of max metadata logging bytes %lu "
"(current) vs %lu (max less already accounted metadata)",
mdentry_len, log_len_);
truncated_ = true;
return;
}
payload_->metadata.emplace(std::string(key), std::string(value));
log_len_ -= mdentry_len;
}
template <typename Which>
void Encode(Which, const typename Which::ValueType&) {}
bool truncated() const { return truncated_; }
private:
LoggingSink::Entry::Payload* const payload_;
absl::string_view* const status_details_bin_;
uint64_t log_len_;
bool truncated_ = false;
};
void SetIpPort(absl::string_view s, LoggingSink::Entry::Address* peer) {
absl::string_view host;
absl::string_view port;
if (grpc_core::SplitHostPort(absl::string_view(s.data(), s.length()), &host,
&port) == 1) {
if (!host.empty()) {
peer->address = std::string(host);
}
if (!port.empty()) {
if (!absl::SimpleAtoi(absl::string_view(port.data(), port.size()),
&peer->ip_port)) {
peer->ip_port = 0;
}
}
}
}
LoggingSink::Entry::Address PeerStringToAddress(absl::string_view peer_string) {
LoggingSink::Entry::Address address;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(peer_string);
if (!uri.ok()) {
gpr_log(GPR_DEBUG, "peer_string is in invalid format and cannot be logged");
return address;
}
if (uri->scheme() == "ipv4") {
address.type = LoggingSink::Entry::Address::Type::kIpv4;
SetIpPort(uri->path(), &address);
} else if (uri->scheme() == "ipv6") {
address.type = LoggingSink::Entry::Address::Type::kIpv6;
// TODO(zpencer): per grfc, verify RFC5952 section 4 styled addrs in use
SetIpPort(uri->path(), &address);
} else if (uri->scheme() == "unix") {
address.type = LoggingSink::Entry::Address::Type::kUnix;
address.address = uri->path();
}
return address;
}
void EncodeMessageToPayload(const grpc_core::SliceBuffer* message,
uint32_t log_len, LoggingSink::Entry* entry) {
auto* sb = message->c_slice_buffer();
entry->payload.message_length = sb->length;
// Log the message to a max of the configured message length
for (size_t i = 0; i < sb->count; i++) {
absl::StrAppend(
&entry->payload.message,
absl::string_view(
reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(sb->slices[i])),
std::min(static_cast<size_t>(GRPC_SLICE_LENGTH(sb->slices[i])),
static_cast<size_t>(log_len))));
if (log_len < GRPC_SLICE_LENGTH(sb->slices[i])) {
entry->payload_truncated = true;
break;
}
log_len -= GRPC_SLICE_LENGTH(sb->slices[i]);
}
}
class CallData {
public:
CallData(bool is_client, const grpc_core::CallArgs& call_args,
const std::string& authority)
: call_id_(GetCallId()) {
absl::string_view path;
if (auto* value = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata())) {
path = value->as_string_view();
}
std::vector<std::string> parts =
absl::StrSplit(path, '/', absl::SkipEmpty());
if (parts.size() == 2) {
service_name_ = std::move(parts[0]);
method_name_ = std::move(parts[1]);
}
config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_);
if (config_.ShouldLog()) {
if (auto* value = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpAuthorityMetadata())) {
authority_ = std::string(value->as_string_view());
} else {
authority_ = authority;
}
}
}
bool ShouldLog() { return config_.ShouldLog(); }
void LogClientHeader(bool is_client,
const grpc_core::ClientMetadataHandle& metadata) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kClientHeader);
MetadataEncoder encoder(&entry.payload, nullptr,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
if (!is_client) {
if (auto* value = metadata->get_pointer(grpc_core::PeerString())) {
peer_ = PeerStringToAddress(*value);
}
}
g_logging_sink->LogEntry(std::move(entry));
}
void LogClientHalfClose(bool is_client) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kClientHalfClose);
g_logging_sink->LogEntry(std::move(entry));
}
void LogServerHeader(bool is_client,
const grpc_core::ServerMetadata* metadata) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kServerHeader);
if (metadata != nullptr) {
MetadataEncoder encoder(&entry.payload, nullptr,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
if (is_client) {
if (auto* value = metadata->get_pointer(grpc_core::PeerString())) {
peer_ = PeerStringToAddress(*value);
}
}
}
g_logging_sink->LogEntry(std::move(entry));
}
void LogServerTrailer(bool is_client,
const grpc_core::ServerMetadata* metadata) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kServerTrailer);
if (metadata != nullptr) {
MetadataEncoder encoder(&entry.payload, nullptr,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
}
g_logging_sink->LogEntry(std::move(entry));
}
void LogClientMessage(bool is_client, const grpc_core::SliceBuffer* message) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kClientMessage);
EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
g_logging_sink->LogEntry(std::move(entry));
}
void LogServerMessage(bool is_client, const grpc_core::SliceBuffer* message) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kServerMessage);
EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
g_logging_sink->LogEntry(std::move(entry));
}
void LogCancel(bool is_client) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client,
LoggingSink::Entry::EventType::kCancel);
g_logging_sink->LogEntry(std::move(entry));
}
private:
void SetCommonEntryFields(LoggingSink::Entry* entry, bool is_client,
LoggingSink::Entry::EventType event_type) {
entry->call_id = call_id_;
entry->sequence_id = sequence_id_++;
entry->type = event_type;
entry->logger = is_client ? LoggingSink::Entry::Logger::kClient
: LoggingSink::Entry::Logger::kServer;
entry->authority = authority_;
entry->peer = peer_;
entry->service_name = service_name_;
entry->method_name = method_name_;
}
uint64_t call_id_;
uint32_t sequence_id_ = 0;
std::string service_name_;
std::string method_name_;
std::string authority_;
LoggingSink::Entry::Address peer_;
LoggingSink::Config config_;
};
class ClientLoggingFilter final : public grpc_core::ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientLoggingFilter> Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
absl::optional<absl::string_view> default_authority =
args.GetString(GRPC_ARG_DEFAULT_AUTHORITY);
if (default_authority.has_value()) {
return ClientLoggingFilter(std::string(default_authority.value()));
}
absl::optional<std::string> server_uri =
args.GetOwnedString(GRPC_ARG_SERVER_URI);
if (server_uri.has_value()) {
return ClientLoggingFilter(grpc_core::CoreConfiguration::Get()
.resolver_registry()
.GetDefaultAuthority(*server_uri));
}
return ClientLoggingFilter("");
}
// Construct a promise for one call.
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override {
CallData* calld =
grpc_core::GetContext<grpc_core::Arena>()->ManagedNew<CallData>(
true, call_args, default_authority_);
if (!calld->ShouldLog()) {
return next_promise_factory(std::move(call_args));
}
calld->LogClientHeader(/*is_client=*/true,
call_args.client_initial_metadata);
auto* server_initial_metadata = call_args.server_initial_metadata;
auto incoming_mapper =
grpc_core::PipeMapper<grpc_core::MessageHandle>::Intercept(
*call_args.incoming_messages);
grpc_core::PipeMapper<grpc_core::MessageHandle> outgoing_mapper =
grpc_core::PipeMapper<grpc_core::MessageHandle>::Intercept(
*call_args.outgoing_messages);
return grpc_core::OnCancel(
grpc_core::TryConcurrently(
grpc_core::Seq(
next_promise_factory(std::move(call_args)),
[calld](grpc_core::ServerMetadataHandle metadata) mutable
-> grpc_core::ServerMetadataHandle {
calld->LogServerTrailer(/*is_client=*/true, metadata.get());
return metadata;
}))
.NecessaryPull(grpc_core::Seq(
server_initial_metadata->Wait(),
[calld](
grpc_core::ServerMetadata** server_initial_metadata) mutable
-> grpc_core::ArenaPromise<absl::Status> {
if (server_initial_metadata != nullptr) {
calld->LogServerHeader(/*is_client=*/true,
*server_initial_metadata);
}
return grpc_core::ImmediateOkStatus();
}))
.NecessaryPull(incoming_mapper.TakeAndRun(
[calld](grpc_core::MessageHandle message)
-> absl::StatusOr<grpc_core::MessageHandle> {
calld->LogServerMessage(/*is_client=*/true,
message->payload());
return message;
}))
.NecessaryPush(grpc_core::Seq(
outgoing_mapper.TakeAndRun(
[calld](grpc_core::MessageHandle message)
-> absl::StatusOr<grpc_core::MessageHandle> {
calld->LogClientMessage(/*is_client=*/true,
message->payload());
return message;
}),
[calld]() mutable -> grpc_core::ArenaPromise<absl::Status> {
calld->LogClientHalfClose(/*is_client=*/true);
return grpc_core::ImmediateOkStatus();
})),
[calld]() { calld->LogCancel(/*is_client=*/true); });
}
private:
explicit ClientLoggingFilter(std::string default_authority)
: default_authority_(std::move(default_authority)) {}
std::string default_authority_;
};
const grpc_channel_filter ClientLoggingFilter::kFilter =
grpc_core::MakePromiseBasedFilter<
ClientLoggingFilter, grpc_core::FilterEndpoint::kClient,
grpc_core::kFilterExaminesServerInitialMetadata |
grpc_core::kFilterExaminesInboundMessages |
grpc_core::kFilterExaminesOutboundMessages>("logging");
class ServerLoggingFilter final : public grpc_core::ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerLoggingFilter> Create(
const grpc_core::ChannelArgs& /*args*/,
ChannelFilter::Args /*filter_args*/) {
return ServerLoggingFilter();
}
// Construct a promise for one call.
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override {
CallData* calld =
grpc_core::GetContext<grpc_core::Arena>()->ManagedNew<CallData>(
false, call_args, /*default_authority=*/"");
if (!calld->ShouldLog()) {
return next_promise_factory(std::move(call_args));
}
calld->LogClientHeader(/*is_client=*/false,
call_args.client_initial_metadata);
auto* server_initial_metadata = call_args.server_initial_metadata;
auto incoming_mapper =
grpc_core::PipeMapper<grpc_core::MessageHandle>::Intercept(
*call_args.incoming_messages);
grpc_core::PipeMapper<grpc_core::MessageHandle> outgoing_mapper =
grpc_core::PipeMapper<grpc_core::MessageHandle>::Intercept(
*call_args.outgoing_messages);
return grpc_core::OnCancel(
grpc_core::TryConcurrently(
grpc_core::Seq(
next_promise_factory(std::move(call_args)),
[calld](grpc_core::ServerMetadataHandle metadata) mutable
-> grpc_core::ServerMetadataHandle {
calld->LogServerTrailer(/*is_client=*/false, metadata.get());
return metadata;
}))
.Push(grpc_core::Seq(
server_initial_metadata->Wait(),
[calld](
grpc_core::ServerMetadata** server_initial_metadata) mutable
-> grpc_core::ArenaPromise<absl::Status> {
calld->LogServerHeader(/*is_client=*/false,
*server_initial_metadata);
return grpc_core::ImmediateOkStatus();
}))
.Push(outgoing_mapper.TakeAndRun(
[calld](grpc_core::MessageHandle message)
-> absl::StatusOr<grpc_core::MessageHandle> {
calld->LogServerMessage(/*is_client=*/false,
message->payload());
return message;
}))
.NecessaryPull(grpc_core::Seq(
incoming_mapper.TakeAndRun(
[calld](grpc_core::MessageHandle message)
-> absl::StatusOr<grpc_core::MessageHandle> {
calld->LogClientMessage(/*is_client=*/false,
message->payload());
return message;
}),
[calld]() mutable -> grpc_core::ArenaPromise<absl::Status> {
calld->LogClientHalfClose(/*is_client=*/false);
return grpc_core::ImmediateOkStatus();
})),
[calld]() { calld->LogCancel(/*is_client=*/false); });
}
};
const grpc_channel_filter ServerLoggingFilter::kFilter =
grpc_core::MakePromiseBasedFilter<
ServerLoggingFilter, grpc_core::FilterEndpoint::kServer,
grpc_core::kFilterExaminesServerInitialMetadata |
grpc_core::kFilterExaminesInboundMessages |
grpc_core::kFilterExaminesOutboundMessages>("logging");
} // namespace
void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink;
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
// TODO(yashykt) : Figure out a good place to place this channel
// arg
if (builder->channel_args()
.GetInt("grpc.experimental.enable_observability")
.value_or(true)) {
builder->PrependFilter(&ServerLoggingFilter::kFilter);
}
return true;
});
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
// TODO(yashykt) : Figure out a good place to place this channel
// arg
if (builder->channel_args()
.GetInt("grpc.experimental.enable_observability")
.value_or(true)) {
builder->PrependFilter(&ClientLoggingFilter::kFilter);
}
return true;
});
});
}
} // namespace internal
} // namespace grpc

@ -0,0 +1,35 @@
//
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_LOGGING_LOGGING_FILTER_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_LOGGING_LOGGING_FILTER_H
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/logging/logging_sink.h"
namespace grpc {
namespace internal {
// TODO(yashykt): Add logging sink registration
void RegisterLoggingFilter(LoggingSink* sink);
} // namespace internal
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_EXT_FILTERS_LOGGING_LOGGING_FILTER_H

@ -38,14 +38,17 @@ class LoggingSink {
public:
class Config {
public:
// Constructs a default config which has logging disabled
Config() {}
Config(uint32_t max_metadata_bytes, uint32_t max_message_bytes)
: max_metadata_bytes_(max_metadata_bytes),
: enabled_(true),
max_metadata_bytes_(max_metadata_bytes),
max_message_bytes_(max_message_bytes) {}
bool MetadataLoggingEnabled() { return max_metadata_bytes_ != 0; }
bool MessageLoggingEnabled() { return max_message_bytes_ != 0; }
bool ShouldLog() {
return MetadataLoggingEnabled() || MessageLoggingEnabled();
}
bool ShouldLog() { return enabled_; }
uint32_t max_metadata_bytes() const { return max_metadata_bytes_; }
uint32_t max_message_bytes() const { return max_message_bytes_; }
bool operator==(const Config& other) const {
return max_metadata_bytes_ == other.max_metadata_bytes_ &&
@ -53,8 +56,9 @@ class LoggingSink {
}
private:
uint32_t max_metadata_bytes_;
uint32_t max_message_bytes_;
bool enabled_ = false;
uint32_t max_metadata_bytes_ = 0;
uint32_t max_message_bytes_ = 0;
};
struct Entry {
@ -83,9 +87,9 @@ class LoggingSink {
struct Address {
enum class Type { kUnknown = 0, kIpv4, kIpv6, kUnix };
Type type;
Type type = LoggingSink::Entry::Address::Type::kUnknown;
std::string address;
uint32_t ip_port;
uint32_t ip_port = 0;
};
uint64_t call_id = 0;
@ -102,7 +106,8 @@ class LoggingSink {
virtual ~LoggingSink() = default;
virtual Config FindMatch(bool is_client, absl::string_view path) = 0;
virtual Config FindMatch(bool is_client, absl::string_view service,
absl::string_view method) = 0;
virtual void LogEntry(Entry entry) = 0;
};

@ -20,8 +20,6 @@
#include "src/cpp/ext/gcp/observability_logging_sink.h"
#include <stddef.h>
#include <algorithm>
#include <map>
#include <utility>
@ -83,17 +81,11 @@ ObservabilityLoggingSink::ObservabilityLoggingSink(
}
LoggingSink::Config ObservabilityLoggingSink::FindMatch(
bool is_client, absl::string_view path) {
size_t pos = path.find('/');
if (pos == absl::string_view::npos) {
// bad path - did not find '/'
return LoggingSink::Config(0, 0);
}
absl::string_view service =
path.substr(0, pos); // service name is before the '/'
absl::string_view method =
path.substr(pos + 1); // method name starts after the '/'
bool is_client, absl::string_view service, absl::string_view method) {
const auto& configs = is_client ? client_configs_ : server_configs_;
if (service.empty() || method.empty()) {
return LoggingSink::Config();
}
for (const auto& config : configs) {
for (const auto& config_method : config.parsed_methods) {
if ((config_method.service == "*") ||
@ -101,14 +93,14 @@ LoggingSink::Config ObservabilityLoggingSink::FindMatch(
((config_method.method == "*") ||
(method == config_method.method)))) {
if (config.exclude) {
return LoggingSink::Config(0, 0);
return LoggingSink::Config();
}
return LoggingSink::Config(config.max_metadata_bytes,
config.max_message_bytes);
}
}
}
return LoggingSink::Config(0, 0);
return LoggingSink::Config();
}
namespace {
@ -202,9 +194,11 @@ void PeerToJsonStructProto(LoggingSink::Entry::Address peer,
::google::protobuf::Struct* peer_json) {
(*peer_json->mutable_fields())["type"].set_string_value(
AddressTypeToString(peer.type));
(*peer_json->mutable_fields())["address"].set_string_value(
std::move(peer.address));
(*peer_json->mutable_fields())["ipPort"].set_number_value(peer.ip_port);
if (peer.type != LoggingSink::Entry::Address::Type::kUnknown) {
(*peer_json->mutable_fields())["address"].set_string_value(
std::move(peer.address));
(*peer_json->mutable_fields())["ipPort"].set_number_value(peer.ip_port);
}
}
} // namespace

@ -48,8 +48,8 @@ class ObservabilityLoggingSink : public LoggingSink {
~ObservabilityLoggingSink() override = default;
LoggingSink::Config FindMatch(bool is_client,
absl::string_view path) override;
LoggingSink::Config FindMatch(bool is_client, absl::string_view service,
absl::string_view method) override;
void LogEntry(Entry entry) override;

@ -0,0 +1,45 @@
# gRPC Bazel BUILD file.
#
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
licenses(["notice"])
grpc_package(name = "test/cpp/ext/filters/logging")
grpc_cc_test(
name = "logging_test",
srcs = [
"logging_test.cc",
],
external_deps = [
"gtest",
],
language = "C++",
tags = [
"no_mac", # TODO(yashykt): The default stack-sizes are not enough to run this test on MacOS
],
deps = [
"//:grpc++",
"//src/cpp/ext/filters/logging:logging_filter",
"//src/cpp/ext/gcp:observability_logging_sink",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:test_service_impl",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,688 @@
//
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <chrono>
#include <thread> // NOLINT
#include "absl/strings/str_cat.h"
#include "gmock/gmock.h"
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include <grpc++/grpc++.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/ext/filters/logging/logging_filter.h"
#include "src/cpp/ext/gcp/observability_logging_sink.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "src/proto/grpc/testing/echo_messages.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace grpc {
namespace testing {
namespace {
using grpc::internal::LoggingSink;
using ::testing::AllOf;
using ::testing::Eq;
using ::testing::Field;
using ::testing::Pair;
using ::testing::UnorderedElementsAre;
class MyTestServiceImpl : public TestServiceImpl {
public:
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
context->AddInitialMetadata("server-header-key", "server-header-value");
context->AddTrailingMetadata("server-trailer-key", "server-trailer-value");
return TestServiceImpl::Echo(context, request, response);
}
};
class TestLoggingSink : public grpc::internal::LoggingSink {
public:
Config FindMatch(bool /* is_client */, absl::string_view /* service */,
absl::string_view /* method */) override {
grpc_core::MutexLock lock(&mu_);
return config_;
}
void LogEntry(Entry entry) override {
::google::protobuf::Struct json;
grpc::internal::EntryToJsonStructProto(entry, &json);
std::string output;
::google::protobuf::TextFormat::PrintToString(json, &output);
gpr_log(GPR_ERROR, "%s", output.c_str());
grpc_core::MutexLock lock(&mu_);
entries_.push_back(std::move(entry));
}
void SetConfig(Config config) {
grpc_core::MutexLock lock(&mu_);
config_ = config;
}
std::vector<LoggingSink::Entry> entries() {
grpc_core::MutexLock lock(&mu_);
return entries_;
}
void Clear() {
grpc_core::MutexLock lock(&mu_);
entries_.clear();
}
private:
grpc_core::Mutex mu_;
std::vector<LoggingSink::Entry> entries_ GUARDED_BY(mu_);
Config config_ GUARDED_BY(mu_);
};
TestLoggingSink* g_test_logging_sink = nullptr;
class LoggingTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
g_test_logging_sink = new TestLoggingSink;
grpc::internal::RegisterLoggingFilter(g_test_logging_sink);
}
void SetUp() override {
// Clean up previous entries
g_test_logging_sink->Clear();
// Set up a synchronous server on a different thread to avoid the asynch
// interface.
grpc::ServerBuilder builder;
int port = grpc_pick_unused_port_or_die();
server_address_ = absl::StrCat("localhost:", port);
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort(server_address_, grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
server_thread_ = std::thread(&LoggingTest::RunServerLoop, this);
stub_ = EchoTestService::NewStub(grpc::CreateChannel(
server_address_, grpc::InsecureChannelCredentials()));
}
void ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(std::move(channel));
}
void TearDown() override {
server_->Shutdown();
server_thread_.join();
}
void RunServerLoop() { server_->Wait(); }
std::string server_address_;
MyTestServiceImpl service_;
std::unique_ptr<grpc::Server> server_;
std::thread server_thread_;
std::unique_ptr<EchoTestService::Stub> stub_;
};
TEST_F(LoggingTest, SimpleRpc) {
g_test_logging_sink->SetConfig(
grpc::internal::LoggingSink::Config(4096, 4096));
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
context.AddMetadata("client-key", "client-value");
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_THAT(
g_test_logging_sink->entries(),
::testing::UnorderedElementsAre(
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("client-key", "client-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHalfClose)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo"))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair(
"server-header-key", "server-header-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(
Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerTrailer)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair("server-trailer-key",
"server-trailer-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("client-key", "client-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHalfClose)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo"))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair(
"server-header-key", "server-header-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerTrailer)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("server-trailer-key",
"server-trailer-value")))))));
}
TEST_F(LoggingTest, LoggingDisabled) {
g_test_logging_sink->SetConfig(grpc::internal::LoggingSink::Config());
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
context.AddMetadata("client-key", "client-value");
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(g_test_logging_sink->entries().empty());
}
TEST_F(LoggingTest, MetadataTruncated) {
g_test_logging_sink->SetConfig(grpc::internal::LoggingSink::Config(
40 /* expect truncated metadata*/, 4096));
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
context.AddMetadata("client-key", "client-value");
context.AddMetadata("client-key-2", "client-value-2");
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_THAT(
g_test_logging_sink->entries(),
::testing::UnorderedElementsAre(
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("client-key", "client-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHalfClose)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo"))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair(
"server-header-key", "server-header-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(
Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerTrailer)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair("server-trailer-key",
"server-trailer-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("client-key", "client-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHalfClose)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo"))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair(
"server-header-key", "server-header-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(5)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\003foo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerTrailer)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("server-trailer-key",
"server-trailer-value")))))));
}
TEST_F(LoggingTest, PayloadTruncated) {
g_test_logging_sink->SetConfig(grpc::internal::LoggingSink::Config(4096, 10));
EchoRequest request;
// The following message should get truncated
request.set_message("Hello World");
EchoResponse response;
grpc::ClientContext context;
context.AddMetadata("client-key", "client-value");
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_THAT(
g_test_logging_sink->entries(),
::testing::UnorderedElementsAre(
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("client-key", "client-value"))))),
AllOf(
Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(13)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\013Hello Wo") /* truncated message */))),
Field(&LoggingSink::Entry::payload_truncated, true)),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHalfClose)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo"))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair(
"server-header-key", "server-header-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(13)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\013Hello Wo"))))),
AllOf(
Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerTrailer)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kClient)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair("server-trailer-key",
"server-trailer-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("client-key", "client-value"))))),
AllOf(
Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(13)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\013Hello Wo") /* truncated message */))),
Field(&LoggingSink::Entry::payload_truncated, true)),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kClientHalfClose)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo"))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerHeader)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(Pair(
"server-header-key", "server-header-value"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerMessage)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
AllOf(Field(&LoggingSink::Entry::Payload::message_length,
Eq(13)),
Field(&LoggingSink::Entry::Payload::message,
Eq("\n\013Hello Wo"))))),
AllOf(Field(&LoggingSink::Entry::type,
Eq(LoggingSink::Entry::EventType::kServerTrailer)),
Field(&LoggingSink::Entry::logger,
Eq(LoggingSink::Entry::Logger::kServer)),
Field(&LoggingSink::Entry::authority, Eq(server_address_)),
Field(&LoggingSink::Entry::service_name,
Eq("grpc.testing.EchoTestService")),
Field(&LoggingSink::Entry::method_name, Eq("Echo")),
Field(&LoggingSink::Entry::payload,
Field(&LoggingSink::Entry::Payload::metadata,
UnorderedElementsAre(
Pair("server-trailer-key",
"server-trailer-value")))))));
}
TEST_F(LoggingTest, CancelledRpc) {
g_test_logging_sink->SetConfig(
grpc::internal::LoggingSink::Config(4096, 4096));
EchoRequest request;
request.set_message("foo");
const int kCancelDelayUs = 10 * 1000;
request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
EchoResponse response;
grpc::ClientContext context;
context.AddMetadata("client-key", "client-value");
auto cancel_thread = std::thread(
[&context, this](int delay) {
std::this_thread::sleep_for(std::chrono::microseconds(delay));
while (!service_.signal_client()) {
}
context.TryCancel();
},
kCancelDelayUs);
grpc::Status status = stub_->Echo(&context, request, &response);
cancel_thread.join();
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
auto initial_time = absl::Now();
while (true) {
bool found_cancel_on_client = false;
bool found_cancel_on_server = false;
for (const auto& entry : g_test_logging_sink->entries()) {
if (entry.type == LoggingSink::Entry::EventType::kCancel) {
if (entry.logger == LoggingSink::Entry::Logger::kClient) {
found_cancel_on_client = true;
} else {
found_cancel_on_server = true;
}
}
}
if (found_cancel_on_client && found_cancel_on_server) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_LT(absl::Now() - initial_time, absl::Seconds(10));
}
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -27,6 +27,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
"//:grpcpp_gcp_observability",
"//test/cpp/util:test_util",
@ -42,6 +43,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
"//src/cpp/ext/gcp:observability_config",
"//test/cpp/util:test_util",
@ -57,6 +59,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
"//src/cpp/ext/gcp:observability_logging_sink",
"//test/cpp/util:test_util",

@ -40,9 +40,9 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigEmpty) {
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "foo", "bar").ShouldLog());
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(false, "foo", "bar").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest, LoggingConfigClientWildCardEntries) {
@ -65,9 +65,10 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigClientWildCardEntries) {
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo", "bar"),
LoggingSink::Config(1024, 4096));
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(false, "foo", "bar").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest, LoggingConfigBadPath) {
@ -89,7 +90,7 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigBadPath) {
*json, grpc_core::JsonArgs(), &errors);
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
EXPECT_EQ(sink.FindMatch(true, "foo"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "foo", "").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest,
@ -113,12 +114,12 @@ TEST(GcpObservabilityLoggingSinkTest,
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "service/bar"),
EXPECT_EQ(sink.FindMatch(true, "service", "bar"),
LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "foo", "bar").ShouldLog());
// server test
EXPECT_EQ(sink.FindMatch(false, "service/bar"), LoggingSink::Config(0, 0));
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(false, "service", "bar").ShouldLog());
EXPECT_FALSE(sink.FindMatch(false, "foo", "bar").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest,
@ -142,11 +143,13 @@ TEST(GcpObservabilityLoggingSinkTest,
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo", "bar"),
LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo", "baz"),
LoggingSink::Config(1024, 4096));
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_EQ(sink.FindMatch(false, "foo/baz"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(false, "foo", "bar").ShouldLog());
EXPECT_FALSE(sink.FindMatch(false, "foo", "baz").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest, LoggingConfigClientMultipleEventEntries) {
@ -174,11 +177,12 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigClientMultipleEventEntries) {
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(512, 2048));
EXPECT_EQ(sink.FindMatch(true, "foo", "bar"),
LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(true, "foo", "baz"), LoggingSink::Config(512, 2048));
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_EQ(sink.FindMatch(false, "foo/baz"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(false, "foo", "bar").ShouldLog());
EXPECT_FALSE(sink.FindMatch(false, "foo", "baz").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest, LoggingConfigServerWildCardEntries) {
@ -201,9 +205,10 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigServerWildCardEntries) {
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "foo", "bar").ShouldLog());
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo", "bar"),
LoggingSink::Config(1024, 4096));
}
TEST(GcpObservabilityLoggingSinkTest,
@ -227,12 +232,12 @@ TEST(GcpObservabilityLoggingSinkTest,
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "service/bar"), LoggingSink::Config(0, 0));
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "service", "bar").ShouldLog());
EXPECT_FALSE(sink.FindMatch(true, "foo", "bar").ShouldLog());
// server test
EXPECT_EQ(sink.FindMatch(false, "service/bar"),
EXPECT_EQ(sink.FindMatch(false, "service", "bar"),
LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(false, "foo", "bar").ShouldLog());
}
TEST(GcpObservabilityLoggingSinkTest,
@ -256,11 +261,13 @@ TEST(GcpObservabilityLoggingSinkTest,
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "foo", "bar").ShouldLog());
EXPECT_FALSE(sink.FindMatch(true, "foo", "baz").ShouldLog());
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo/baz"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo", "bar"),
LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo", "baz"),
LoggingSink::Config(1024, 4096));
}
TEST(GcpObservabilityLoggingSinkTest, LoggingConfigServerMultipleEventEntries) {
@ -288,11 +295,13 @@ TEST(GcpObservabilityLoggingSinkTest, LoggingConfigServerMultipleEventEntries) {
ASSERT_TRUE(errors.ok()) << errors.status("unexpected errors");
ObservabilityLoggingSink sink(config.cloud_logging.value(), "test");
// client test
EXPECT_EQ(sink.FindMatch(true, "foo/bar"), LoggingSink::Config(0, 0));
EXPECT_EQ(sink.FindMatch(true, "foo/baz"), LoggingSink::Config(0, 0));
EXPECT_FALSE(sink.FindMatch(true, "foo", "bar").ShouldLog());
EXPECT_FALSE(sink.FindMatch(true, "foo", "baz").ShouldLog());
// server test
EXPECT_EQ(sink.FindMatch(false, "foo/bar"), LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo/baz"), LoggingSink::Config(512, 2048));
EXPECT_EQ(sink.FindMatch(false, "foo", "bar"),
LoggingSink::Config(1024, 4096));
EXPECT_EQ(sink.FindMatch(false, "foo", "baz"),
LoggingSink::Config(512, 2048));
}
TEST(EntryToJsonStructTest, ClientHeader) {

@ -656,7 +656,8 @@ def _exclude_unwanted_cc_tests(tests: List[str]) -> List[str]:
test for test in tests
if not test.startswith('test/cpp/ext/filters/census:') and
not test.startswith('test/core/xds:xds_channel_stack_modifier_test') and
not test.startswith('test/cpp/ext/gcp:')
not test.startswith('test/cpp/ext/gcp:') and
not test.startswith('test/cpp/ext/filters/logging:')
]
# missing opencensus/stats/stats.h
@ -1064,8 +1065,8 @@ _BUILD_EXTRA_METADATA = {
# TODO(jtattermusch): create_jwt and verify_jwt breaks distribtests because it depends on grpc_test_utils and thus requires tests to be built
# For now it's ok to disable them as these binaries aren't very useful anyway.
#'test/core/security:create_jwt': { 'language': 'c', 'build': 'tool', '_TYPE': 'target', '_RENAME': 'grpc_create_jwt' },
#'test/core/security:verify_jwt': { 'language': 'c', 'build': 'tool', '_TYPE': 'target', '_RENAME': 'grpc_verify_jwt' },
# 'test/core/security:create_jwt': { 'language': 'c', 'build': 'tool', '_TYPE': 'target', '_RENAME': 'grpc_create_jwt' },
# 'test/core/security:verify_jwt': { 'language': 'c', 'build': 'tool', '_TYPE': 'target', '_RENAME': 'grpc_verify_jwt' },
# TODO(jtattermusch): add remaining tools such as grpc_print_google_default_creds_token (they are not used by bazel build)

Loading…
Cancel
Save