[call-v3] Add client half close event edge to filters (#36598)

Closes #36598

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36598 from ctiller:transport-refs-4 c4835a8249
PiperOrigin-RevId: 634093923
pull/36608/head^2
Craig Tiller 10 months ago committed by Copybara-Service
parent 05ae7fb926
commit 545bd5171d
  1. 1
      BUILD
  2. 2
      build_autogenerated.yaml
  3. 1
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  4. 1
      src/core/ext/filters/backend_metrics/backend_metric_filter.h
  5. 1
      src/core/ext/filters/fault_injection/fault_injection_filter.cc
  6. 1
      src/core/ext/filters/fault_injection/fault_injection_filter.h
  7. 1
      src/core/ext/filters/http/client/http_client_filter.cc
  8. 1
      src/core/ext/filters/http/client/http_client_filter.h
  9. 1
      src/core/ext/filters/http/client_authority_filter.cc
  10. 1
      src/core/ext/filters/http/client_authority_filter.h
  11. 2
      src/core/ext/filters/http/message_compress/compression_filter.cc
  12. 2
      src/core/ext/filters/http/message_compress/compression_filter.h
  13. 1
      src/core/ext/filters/http/server/http_server_filter.cc
  14. 1
      src/core/ext/filters/http/server/http_server_filter.h
  15. 1
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  16. 1
      src/core/ext/filters/load_reporting/server_load_reporting_filter.h
  17. 491
      src/core/ext/filters/logging/logging_filter.cc
  18. 77
      src/core/ext/filters/logging/logging_filter.h
  19. 45
      src/core/ext/filters/logging/logging_sink.h
  20. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  21. 2
      src/core/ext/filters/message_size/message_size_filter.h
  22. 1
      src/core/ext/filters/rbac/rbac_filter.cc
  23. 1
      src/core/ext/filters/rbac/rbac_filter.h
  24. 1
      src/core/ext/filters/stateful_session/stateful_session_filter.cc
  25. 1
      src/core/ext/filters/stateful_session/stateful_session_filter.h
  26. 7
      src/core/lib/channel/context.h
  27. 301
      src/core/lib/channel/promise_based_filter.h
  28. 1
      src/core/lib/security/authorization/grpc_server_authz_filter.cc
  29. 1
      src/core/lib/security/authorization/grpc_server_authz_filter.h
  30. 1
      src/core/lib/security/transport/auth_filters.h
  31. 1
      src/core/lib/security/transport/server_auth_filter.cc
  32. 6
      src/core/lib/transport/call_filters.cc
  33. 71
      src/core/lib/transport/call_filters.h
  34. 1
      src/core/load_balancing/grpclb/client_load_reporting_filter.cc
  35. 1
      src/core/load_balancing/grpclb/client_load_reporting_filter.h
  36. 3
      src/core/resolver/xds/xds_resolver.cc
  37. 2
      src/core/server/server_call_tracer_filter.cc
  38. 2
      src/core/server/server_config_selector_filter.cc
  39. 3
      src/core/service_config/service_config_channel_arg_filter.cc
  40. 2
      test/core/surface/channel_init_test.cc
  41. 5
      test/core/transport/call_filters_test.cc
  42. 7
      test/core/transport/interception_chain_test.cc
  43. 1
      test/cpp/ext/filters/logging/BUILD
  44. 3
      test/cpp/ext/filters/logging/logging_test.cc

@ -2083,6 +2083,7 @@ grpc_cc_library(
"//src/core:call_final_info",
"//src/core:call_finalization",
"//src/core:call_spine",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_args_preconditioning",
"//src/core:channel_fwd",

@ -4611,6 +4611,7 @@ libs:
- src/core/lib/promise/activity.h
- src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/join_state.h
@ -11767,6 +11768,7 @@ targets:
- src/core/lib/promise/activity.h
- src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/join_state.h

@ -52,6 +52,7 @@ TraceFlag grpc_backend_metric_filter_trace(false, "backend_metric_filter");
const NoInterceptor BackendMetricFilter::Call::OnClientInitialMetadata;
const NoInterceptor BackendMetricFilter::Call::OnServerInitialMetadata;
const NoInterceptor BackendMetricFilter::Call::OnClientToServerMessage;
const NoInterceptor BackendMetricFilter::Call::OnClientToServerHalfClose;
const NoInterceptor BackendMetricFilter::Call::OnServerToClientMessage;
const NoInterceptor BackendMetricFilter::Call::OnFinalize;

@ -44,6 +44,7 @@ class BackendMetricFilter : public ImplementChannelFilter<BackendMetricFilter> {
static const NoInterceptor OnServerInitialMetadata;
void OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -58,6 +58,7 @@ TraceFlag grpc_fault_injection_filter_trace(false, "fault_injection_filter");
const NoInterceptor FaultInjectionFilter::Call::OnServerInitialMetadata;
const NoInterceptor FaultInjectionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor FaultInjectionFilter::Call::OnClientToServerMessage;
const NoInterceptor FaultInjectionFilter::Call::OnClientToServerHalfClose;
const NoInterceptor FaultInjectionFilter::Call::OnServerToClientMessage;
const NoInterceptor FaultInjectionFilter::Call::OnFinalize;

@ -58,6 +58,7 @@ class FaultInjectionFilter
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -54,6 +54,7 @@ namespace grpc_core {
const NoInterceptor HttpClientFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpClientFilter::Call::OnClientToServerMessage;
const NoInterceptor HttpClientFilter::Call::OnClientToServerHalfClose;
const NoInterceptor HttpClientFilter::Call::OnFinalize;
const grpc_channel_filter HttpClientFilter::kFilter =

@ -47,6 +47,7 @@ class HttpClientFilter : public ImplementChannelFilter<HttpClientFilter> {
absl::Status OnServerInitialMetadata(ServerMetadata& md);
absl::Status OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -40,6 +40,7 @@ namespace grpc_core {
const NoInterceptor ClientAuthorityFilter::Call::OnServerInitialMetadata;
const NoInterceptor ClientAuthorityFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientAuthorityFilter::Call::OnClientToServerMessage;
const NoInterceptor ClientAuthorityFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ClientAuthorityFilter::Call::OnServerToClientMessage;
const NoInterceptor ClientAuthorityFilter::Call::OnFinalize;

@ -52,6 +52,7 @@ class ClientAuthorityFilter final
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -57,8 +57,10 @@
namespace grpc_core {
const NoInterceptor ServerCompressionFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ServerCompressionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerCompressionFilter::Call::OnFinalize;
const NoInterceptor ClientCompressionFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ClientCompressionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientCompressionFilter::Call::OnFinalize;

@ -129,6 +129,7 @@ class ClientCompressionFilter final
absl::StatusOr<MessageHandle> OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter);
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
@ -165,6 +166,7 @@ class ServerCompressionFilter final
MessageHandle OnServerToClientMessage(MessageHandle message,
ServerCompressionFilter* filter);
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;

@ -50,6 +50,7 @@
namespace grpc_core {
const NoInterceptor HttpServerFilter::Call::OnClientToServerMessage;
const NoInterceptor HttpServerFilter::Call::OnClientToServerHalfClose;
const NoInterceptor HttpServerFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpServerFilter::Call::OnFinalize;

@ -50,6 +50,7 @@ class HttpServerFilter : public ImplementChannelFilter<HttpServerFilter> {
void OnServerInitialMetadata(ServerMetadata& md);
void OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -74,6 +74,7 @@ constexpr char kEmptyAddressLengthString[] = "00";
const NoInterceptor ServerLoadReportingFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerLoadReportingFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerLoadReportingFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ServerLoadReportingFilter::Call::OnServerToClientMessage;
absl::StatusOr<std::unique_ptr<ServerLoadReportingFilter>>

@ -54,6 +54,7 @@ class ServerLoadReportingFilter
void OnServerTrailingMetadata(ServerMetadata& md,
ServerLoadReportingFilter* filter);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
void OnFinalize(const grpc_call_final_info* final_info,
ServerLoadReportingFilter* filter);

@ -73,6 +73,9 @@
namespace grpc_core {
const NoInterceptor ClientLoggingFilter::Call::OnFinalize;
const NoInterceptor ServerLoggingFilter::Call::OnFinalize;
namespace {
LoggingSink* g_logging_sink = nullptr;
@ -195,152 +198,148 @@ void EncodeMessageToPayload(const SliceBuffer* message, uint32_t log_len,
}
}
class CallData {
public:
CallData(bool is_client, const CallArgs& call_args,
const std::string& authority)
: call_id_(GetCallId()) {
absl::string_view path;
if (auto* value = call_args.client_initial_metadata->get_pointer(
HttpPathMetadata())) {
path = value->as_string_view();
}
std::vector<std::string> parts =
absl::StrSplit(path, '/', absl::SkipEmpty());
if (parts.size() == 2) {
service_name_ = std::move(parts[0]);
method_name_ = std::move(parts[1]);
} // namespace
namespace logging_filter_detail {
CallData::CallData(bool is_client,
const ClientMetadata& client_initial_metadata,
const std::string& authority)
: call_id_(GetCallId()) {
absl::string_view path;
if (auto* value = client_initial_metadata.get_pointer(HttpPathMetadata())) {
path = value->as_string_view();
}
std::vector<std::string> parts = absl::StrSplit(path, '/', absl::SkipEmpty());
if (parts.size() == 2) {
service_name_ = std::move(parts[0]);
method_name_ = std::move(parts[1]);
}
config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_);
if (config_.ShouldLog()) {
if (auto* value =
client_initial_metadata.get_pointer(HttpAuthorityMetadata())) {
authority_ = std::string(value->as_string_view());
} else {
authority_ = authority;
}
config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_);
if (config_.ShouldLog()) {
if (auto* value = call_args.client_initial_metadata->get_pointer(
HttpAuthorityMetadata())) {
authority_ = std::string(value->as_string_view());
} else {
authority_ = authority;
}
}
}
void CallData::LogClientHeader(bool is_client,
CallTracerAnnotationInterface* tracer,
const ClientMetadata& metadata) {
LoggingSink::Entry entry;
if (!is_client) {
if (auto* value = metadata.get_pointer(PeerString())) {
peer_ = PeerStringToAddress(*value);
}
}
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kClientHeader);
MetadataEncoder encoder(&entry.payload, nullptr,
config_.max_metadata_bytes());
metadata.Encode(&encoder);
entry.payload_truncated = encoder.truncated();
g_logging_sink->LogEntry(std::move(entry));
}
bool ShouldLog() { return config_.ShouldLog(); }
void CallData::LogClientHalfClose(bool is_client,
CallTracerAnnotationInterface* tracer) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kClientHalfClose);
g_logging_sink->LogEntry(std::move(entry));
}
void LogClientHeader(bool is_client, CallTracerAnnotationInterface* tracer,
const ClientMetadataHandle& metadata) {
LoggingSink::Entry entry;
if (!is_client) {
void CallData::LogServerHeader(bool is_client,
CallTracerAnnotationInterface* tracer,
const ServerMetadata* metadata) {
LoggingSink::Entry entry;
if (metadata != nullptr) {
entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
if (is_client) {
if (auto* value = metadata->get_pointer(PeerString())) {
peer_ = PeerStringToAddress(*value);
}
}
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kClientHeader);
}
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kServerHeader);
if (metadata != nullptr) {
MetadataEncoder encoder(&entry.payload, nullptr,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
g_logging_sink->LogEntry(std::move(entry));
}
void LogClientHalfClose(bool is_client,
CallTracerAnnotationInterface* tracer) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kClientHalfClose);
g_logging_sink->LogEntry(std::move(entry));
}
void LogServerHeader(bool is_client, CallTracerAnnotationInterface* tracer,
const ServerMetadata* metadata) {
LoggingSink::Entry entry;
if (metadata != nullptr) {
entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
if (is_client) {
if (auto* value = metadata->get_pointer(PeerString())) {
peer_ = PeerStringToAddress(*value);
}
}
}
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kServerHeader);
if (metadata != nullptr) {
MetadataEncoder encoder(&entry.payload, nullptr,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
}
g_logging_sink->LogEntry(std::move(entry));
}
g_logging_sink->LogEntry(std::move(entry));
}
void LogServerTrailer(bool is_client, CallTracerAnnotationInterface* tracer,
const ServerMetadata* metadata) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kServerTrailer);
if (metadata != nullptr) {
entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
MetadataEncoder encoder(&entry.payload, &entry.payload.status_details,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
}
g_logging_sink->LogEntry(std::move(entry));
void CallData::LogServerTrailer(bool is_client,
CallTracerAnnotationInterface* tracer,
const ServerMetadata* metadata) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kServerTrailer);
if (metadata != nullptr) {
entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
MetadataEncoder encoder(&entry.payload, &entry.payload.status_details,
config_.max_metadata_bytes());
metadata->Encode(&encoder);
entry.payload_truncated = encoder.truncated();
}
g_logging_sink->LogEntry(std::move(entry));
}
void LogClientMessage(bool is_client, CallTracerAnnotationInterface* tracer,
const SliceBuffer* message) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kClientMessage);
EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
g_logging_sink->LogEntry(std::move(entry));
}
void CallData::LogClientMessage(bool is_client,
CallTracerAnnotationInterface* tracer,
const SliceBuffer* message) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kClientMessage);
EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
g_logging_sink->LogEntry(std::move(entry));
}
void LogServerMessage(bool is_client, CallTracerAnnotationInterface* tracer,
const SliceBuffer* message) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kServerMessage);
EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
g_logging_sink->LogEntry(std::move(entry));
}
void CallData::LogServerMessage(bool is_client,
CallTracerAnnotationInterface* tracer,
const SliceBuffer* message) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kServerMessage);
EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
g_logging_sink->LogEntry(std::move(entry));
}
void LogCancel(bool is_client, CallTracerAnnotationInterface* tracer) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kCancel);
g_logging_sink->LogEntry(std::move(entry));
}
void CallData::LogCancel(bool is_client,
CallTracerAnnotationInterface* tracer) {
LoggingSink::Entry entry;
SetCommonEntryFields(&entry, is_client, tracer,
LoggingSink::Entry::EventType::kCancel);
g_logging_sink->LogEntry(std::move(entry));
}
private:
void SetCommonEntryFields(LoggingSink::Entry* entry, bool is_client,
CallTracerAnnotationInterface* tracer,
LoggingSink::Entry::EventType event_type) {
entry->call_id = call_id_;
entry->sequence_id = sequence_id_++;
entry->type = event_type;
entry->logger = is_client ? LoggingSink::Entry::Logger::kClient
: LoggingSink::Entry::Logger::kServer;
entry->authority = authority_;
entry->peer = peer_;
entry->service_name = service_name_;
entry->method_name = method_name_;
entry->timestamp = Timestamp::Now();
if (tracer != nullptr) {
entry->trace_id = tracer->TraceId();
entry->span_id = tracer->SpanId();
entry->is_sampled = tracer->IsSampled();
}
void CallData::SetCommonEntryFields(LoggingSink::Entry* entry, bool is_client,
CallTracerAnnotationInterface* tracer,
LoggingSink::Entry::EventType event_type) {
entry->call_id = call_id_;
entry->sequence_id = sequence_id_++;
entry->type = event_type;
entry->logger = is_client ? LoggingSink::Entry::Logger::kClient
: LoggingSink::Entry::Logger::kServer;
entry->authority = authority_;
entry->peer = peer_;
entry->service_name = service_name_;
entry->method_name = method_name_;
entry->timestamp = Timestamp::Now();
if (tracer != nullptr) {
entry->trace_id = tracer->TraceId();
entry->span_id = tracer->SpanId();
entry->is_sampled = tracer->IsSampled();
}
absl::uint128 call_id_;
uint32_t sequence_id_ = 0;
std::string service_name_;
std::string method_name_;
std::string authority_;
LoggingSink::Entry::Address peer_;
LoggingSink::Config config_;
};
}
} // namespace
} // namespace logging_filter_detail
absl::StatusOr<std::unique_ptr<ClientLoggingFilter>>
ClientLoggingFilter::Create(const ChannelArgs& args,
@ -361,84 +360,55 @@ ClientLoggingFilter::Create(const ChannelArgs& args,
return std::make_unique<ClientLoggingFilter>("");
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> ClientLoggingFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
CallData* calld = GetContext<Arena>()->ManagedNew<CallData>(
true, call_args, default_authority_);
if (!calld->ShouldLog()) {
return next_promise_factory(std::move(call_args));
void ClientLoggingFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ClientLoggingFilter* filter) {
call_data_.emplace(true, md, filter->default_authority_);
if (!call_data_->ShouldLog()) {
call_data_.reset();
return;
}
call_data_->LogClientHeader(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>(), md);
}
void ClientLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
if (!call_data_.has_value()) return;
call_data_->LogServerHeader(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>(), &md);
}
void ClientLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
if (!call_data_.has_value()) return;
if (md.get(GrpcCallWasCancelled()).value_or(false) &&
md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
call_data_->LogCancel(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>());
return;
}
calld->LogClientHeader(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
call_args.client_initial_metadata);
call_args.server_initial_metadata->InterceptAndMap(
[calld](ServerMetadataHandle metadata) {
calld->LogServerHeader(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
metadata.get());
return metadata;
});
call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
[calld](MessageHandle message) {
calld->LogClientMessage(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
message->payload());
return message;
},
[calld] {
calld->LogClientHalfClose(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value));
});
call_args.server_to_client_messages->InterceptAndMap(
[calld](MessageHandle message) {
calld->LogServerMessage(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
message->payload());
return message;
});
return OnCancel(
Map(next_promise_factory(std::move(call_args)),
[calld](ServerMetadataHandle md) {
calld->LogServerTrailer(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
md.get());
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[calld, ctx = GetContext<grpc_call_context_element>()]() {
calld->LogCancel(
/*is_client=*/true,
static_cast<CallTracerAnnotationInterface*>(
ctx[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value));
});
call_data_->LogServerTrailer(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>(), &md);
}
void ClientLoggingFilter::Call::OnClientToServerMessage(
const Message& message) {
if (!call_data_.has_value()) return;
call_data_->LogClientMessage(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>(),
message.payload());
}
void ClientLoggingFilter::Call::OnClientToServerHalfClose() {
if (!call_data_.has_value()) return;
call_data_->LogClientHalfClose(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>());
}
void ClientLoggingFilter::Call::OnServerToClientMessage(
const Message& message) {
if (!call_data_.has_value()) return;
call_data_->LogServerMessage(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>(),
message.payload());
}
const grpc_channel_filter ClientLoggingFilter::kFilter =
@ -454,79 +424,54 @@ ServerLoggingFilter::Create(const ChannelArgs& /*args*/,
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> ServerLoggingFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
CallData* calld = GetContext<Arena>()->ManagedNew<CallData>(
false, call_args, /*default_authority=*/"");
if (!calld->ShouldLog()) {
return next_promise_factory(std::move(call_args));
void ServerLoggingFilter::Call::OnClientInitialMetadata(ClientMetadata& md) {
call_data_.emplace(false, md, "");
if (!call_data_->ShouldLog()) {
call_data_.reset();
return;
}
auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value);
calld->LogClientHeader(
/*is_client=*/false, call_tracer, call_args.client_initial_metadata);
call_args.server_initial_metadata->InterceptAndMap(
[calld](ServerMetadataHandle metadata) {
calld->LogServerHeader(
/*is_client=*/false,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
metadata.get());
return metadata;
});
call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
[calld](MessageHandle message) {
calld->LogClientMessage(
/*is_client=*/false,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
message->payload());
return message;
},
[calld] {
calld->LogClientHalfClose(
/*is_client=*/false,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value));
});
call_args.server_to_client_messages->InterceptAndMap(
[calld](MessageHandle message) {
calld->LogServerMessage(
/*is_client=*/false,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
message->payload());
return message;
});
return OnCancel(
Map(next_promise_factory(std::move(call_args)),
[calld](ServerMetadataHandle md) {
calld->LogServerTrailer(
/*is_client=*/false,
static_cast<CallTracerAnnotationInterface*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value),
md.get());
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture
// call_tracer.
[calld, call_tracer]() {
calld->LogCancel(
/*is_client=*/false, call_tracer);
});
call_data_->LogClientHeader(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>(), md);
}
void ServerLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
if (!call_data_.has_value()) return;
call_data_->LogServerHeader(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>(), &md);
}
void ServerLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
if (!call_data_.has_value()) return;
if (md.get(GrpcCallWasCancelled()).value_or(false) &&
md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
call_data_->LogCancel(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>());
return;
}
call_data_->LogServerTrailer(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>(), &md);
}
void ServerLoggingFilter::Call::OnClientToServerMessage(
const Message& message) {
if (!call_data_.has_value()) return;
call_data_->LogClientMessage(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>(),
message.payload());
}
void ServerLoggingFilter::Call::OnClientToServerHalfClose() {
if (!call_data_.has_value()) return;
call_data_->LogClientHalfClose(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>());
}
void ServerLoggingFilter::Call::OnServerToClientMessage(
const Message& message) {
if (!call_data_.has_value()) return;
call_data_->LogServerMessage(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>(),
message.payload());
}
const grpc_channel_filter ServerLoggingFilter::kFilter =

@ -35,7 +35,46 @@
namespace grpc_core {
class ClientLoggingFilter final : public ChannelFilter {
namespace logging_filter_detail {
class CallData {
public:
CallData(bool is_client, const ClientMetadata& client_initial_metadata,
const std::string& authority);
bool ShouldLog() { return config_.ShouldLog(); }
void LogClientHeader(bool is_client, CallTracerAnnotationInterface* tracer,
const ClientMetadata& metadata);
void LogClientHalfClose(bool is_client,
CallTracerAnnotationInterface* tracer);
void LogServerHeader(bool is_client, CallTracerAnnotationInterface* tracer,
const ServerMetadata* metadata);
void LogServerTrailer(bool is_client, CallTracerAnnotationInterface* tracer,
const ServerMetadata* metadata);
void LogClientMessage(bool is_client, CallTracerAnnotationInterface* tracer,
const SliceBuffer* message);
void LogServerMessage(bool is_client, CallTracerAnnotationInterface* tracer,
const SliceBuffer* message);
void LogCancel(bool is_client, CallTracerAnnotationInterface* tracer);
private:
void SetCommonEntryFields(LoggingSink::Entry* entry, bool is_client,
CallTracerAnnotationInterface* tracer,
LoggingSink::Entry::EventType event_type);
absl::uint128 call_id_;
uint32_t sequence_id_ = 0;
std::string service_name_;
std::string method_name_;
std::string authority_;
LoggingSink::Entry::Address peer_;
LoggingSink::Config config_;
};
} // namespace logging_filter_detail
class ClientLoggingFilter final
: public ImplementChannelFilter<ClientLoggingFilter> {
public:
static const grpc_channel_filter kFilter;
@ -45,24 +84,46 @@ class ClientLoggingFilter final : public ChannelFilter {
explicit ClientLoggingFilter(std::string default_authority)
: default_authority_(std::move(default_authority)) {}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md,
ClientLoggingFilter* filter);
void OnServerInitialMetadata(ServerMetadata& md);
void OnServerTrailingMetadata(ServerMetadata& md);
void OnClientToServerMessage(const Message& message);
void OnClientToServerHalfClose();
void OnServerToClientMessage(const Message& message);
static const NoInterceptor OnFinalize;
private:
absl::optional<logging_filter_detail::CallData> call_data_;
};
private:
const std::string default_authority_;
};
class ServerLoggingFilter final : public ChannelFilter {
class ServerLoggingFilter final
: public ImplementChannelFilter<ServerLoggingFilter> {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<std::unique_ptr<ServerLoggingFilter>> Create(
const ChannelArgs& args, ChannelFilter::Args /*filter_args*/);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md);
void OnServerInitialMetadata(ServerMetadata& md);
void OnServerTrailingMetadata(ServerMetadata& md);
void OnClientToServerMessage(const Message& message);
void OnClientToServerHalfClose();
void OnServerToClientMessage(const Message& message);
static const NoInterceptor OnFinalize;
private:
absl::optional<logging_filter_detail::CallData> call_data_;
};
};
void RegisterLoggingFilter(LoggingSink* sink);

@ -27,6 +27,7 @@
#include <string>
#include "absl/numeric/int128.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/gprpp/time.h"
@ -73,8 +74,42 @@ class LoggingSink {
kCancel
};
static std::string EventTypeString(EventType type) {
switch (type) {
case EventType::kUnknown:
return "UNKNOWN";
case EventType::kClientHeader:
return "CLIENT_HEADER";
case EventType::kServerHeader:
return "SERVER_HEADER";
case EventType::kClientMessage:
return "CLIENT_MESSAGE";
case EventType::kServerMessage:
return "SERVER_MESSAGE";
case EventType::kClientHalfClose:
return "CLIENT_HALF_CLOSE";
case EventType::kServerTrailer:
return "SERVER_TRAILER";
case EventType::kCancel:
return "CANCEL";
}
return absl::StrCat("INVALID(", static_cast<int>(type), ")");
}
enum class Logger { kUnknown = 0, kClient, kServer };
static std::string LoggerString(Logger logger) {
switch (logger) {
case Logger::kUnknown:
return "UNKNOWN";
case Logger::kClient:
return "CLIENT";
case Logger::kServer:
return "SERVER";
}
return absl::StrCat("INVALID(", static_cast<int>(logger), ")");
}
struct Payload {
std::map<std::string, std::string> metadata;
Duration timeout;
@ -118,6 +153,16 @@ class LoggingSink {
virtual void LogEntry(Entry entry) = 0;
};
inline std::ostream& operator<<(std::ostream& out,
const LoggingSink::Entry::EventType& type) {
return out << LoggingSink::Entry::EventTypeString(type);
}
inline std::ostream& operator<<(std::ostream& out,
const LoggingSink::Entry::Logger& logger) {
return out << LoggingSink::Entry::LoggerString(logger);
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_SINK_H

@ -51,10 +51,12 @@ namespace grpc_core {
const NoInterceptor ClientMessageSizeFilter::Call::OnClientInitialMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnServerInitialMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ClientMessageSizeFilter::Call::OnFinalize;
const NoInterceptor ServerMessageSizeFilter::Call::OnClientInitialMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ServerMessageSizeFilter::Call::OnFinalize;
//

@ -105,6 +105,7 @@ class ServerMessageSizeFilter final
static const NoInterceptor OnFinalize;
ServerMetadataHandle OnClientToServerMessage(
const Message& message, ServerMessageSizeFilter* filter);
static const NoInterceptor OnClientToServerHalfClose;
ServerMetadataHandle OnServerToClientMessage(
const Message& message, ServerMessageSizeFilter* filter);
};
@ -133,6 +134,7 @@ class ClientMessageSizeFilter final
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
ServerMetadataHandle OnClientToServerMessage(const Message& message);
static const NoInterceptor OnClientToServerHalfClose;
ServerMetadataHandle OnServerToClientMessage(const Message& message);
private:

@ -46,6 +46,7 @@ namespace grpc_core {
const NoInterceptor RbacFilter::Call::OnServerInitialMetadata;
const NoInterceptor RbacFilter::Call::OnServerTrailingMetadata;
const NoInterceptor RbacFilter::Call::OnClientToServerMessage;
const NoInterceptor RbacFilter::Call::OnClientToServerHalfClose;
const NoInterceptor RbacFilter::Call::OnServerToClientMessage;
const NoInterceptor RbacFilter::Call::OnFinalize;

@ -55,6 +55,7 @@ class RbacFilter : public ImplementChannelFilter<RbacFilter> {
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -60,6 +60,7 @@ namespace grpc_core {
TraceFlag grpc_stateful_session_filter_trace(false, "stateful_session_filter");
const NoInterceptor StatefulSessionFilter::Call::OnClientToServerMessage;
const NoInterceptor StatefulSessionFilter::Call::OnClientToServerHalfClose;
const NoInterceptor StatefulSessionFilter::Call::OnServerToClientMessage;
const NoInterceptor StatefulSessionFilter::Call::OnFinalize;

@ -86,6 +86,7 @@ class StatefulSessionFilter
void OnServerInitialMetadata(ServerMetadata& md);
void OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;

@ -72,6 +72,7 @@ struct grpc_call_context_element {
namespace grpc_core {
class Call;
class CallTracerAnnotationInterface;
// Bind the legacy context array into the new style structure
// TODO(ctiller): remove as we migrate these contexts to the new system.
@ -89,6 +90,12 @@ struct OldStyleContext<Call> {
static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL;
};
template <>
struct OldStyleContext<CallTracerAnnotationInterface> {
static constexpr grpc_context_index kIndex =
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE;
};
template <typename T>
class Context<T, absl::void_t<decltype(OldStyleContext<T>::kIndex)>> {
public:

@ -59,6 +59,7 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
@ -354,31 +355,72 @@ template <typename Promise, typename Derived>
auto MapResult(absl::Status (Derived::Call::*fn)(ServerMetadata&), Promise x,
FilterCallData<Derived>* call_data) {
DCHECK(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
auto status = call_data->call.OnServerTrailingMetadata(*md);
if (!status.ok()) return ServerMetadataFromStatus(status);
return md;
});
return OnCancel(
Map(std::move(x),
[call_data](ServerMetadataHandle md) {
auto status = call_data->call.OnServerTrailingMetadata(*md);
if (!status.ok()) {
return ServerMetadataFromStatus(status);
}
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[call_data, ctx = GetContext<grpc_call_context_element>()]() {
grpc_metadata_batch b;
b.Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
b.Set(GrpcCallWasCancelled(), true);
promise_detail::Context<grpc_call_context_element> context(ctx);
call_data->call.OnServerTrailingMetadata(b).IgnoreError();
});
}
template <typename Promise, typename Derived>
auto MapResult(void (Derived::Call::*fn)(ServerMetadata&), Promise x,
FilterCallData<Derived>* call_data) {
DCHECK(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md);
return md;
});
return OnCancel(
Map(std::move(x),
[call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md);
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[call_data, ctx = GetContext<grpc_call_context_element>()]() {
grpc_metadata_batch b;
b.Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
b.Set(GrpcCallWasCancelled(), true);
promise_detail::Context<grpc_call_context_element> context(ctx);
call_data->call.OnServerTrailingMetadata(b);
});
}
template <typename Promise, typename Derived>
auto MapResult(void (Derived::Call::*fn)(ServerMetadata&, Derived*), Promise x,
FilterCallData<Derived>* call_data) {
DCHECK(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md, call_data->channel);
return md;
});
return OnCancel(
Map(std::move(x),
[call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md, call_data->channel);
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[call_data, ctx = GetContext<grpc_call_context_element>()]() {
grpc_metadata_batch b;
b.Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
b.Set(GrpcCallWasCancelled(), true);
promise_detail::Context<grpc_call_context_element> context(ctx);
call_data->call.OnServerTrailingMetadata(b, call_data->channel);
});
}
template <typename Interceptor, typename Derived, typename SfinaeVoid = void>
@ -492,130 +534,193 @@ auto RunCall(Interceptor interceptor, CallArgs call_args,
std::move(call_args), std::move(next_promise_factory), call_data);
}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*,
const CallArgs&) {}
template <typename Derived>
inline auto InterceptClientToServerMessageHandler(
void (Derived::Call::*fn)(const Message&),
FilterCallData<Derived>* call_data, const CallArgs&) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
return [call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
call_data->call.OnClientToServerMessage(*msg);
return std::move(msg);
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
FilterCallData<Derived>* call_data, const CallArgs&) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call_data->call.OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
});
return [call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call_data->call.OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
FilterCallData<Derived>* call_data, const CallArgs&) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md =
call_data->call.OnClientToServerMessage(*msg, call_data->channel);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
});
return [call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md =
call_data->call.OnClientToServerMessage(*msg, call_data->channel);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
FilterCallData<Derived>* call_data, const CallArgs&) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
return call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
});
return [call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
return call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
FilterCallData<Derived>* call_data, const CallArgs&) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
return [call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
if (r.ok()) return std::move(*r);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
};
}
template <typename Derived, typename HookFunction>
inline void InterceptClientToServerMessage(HookFunction hook,
const NoInterceptor*,
FilterCallData<Derived>* call_data,
const CallArgs& call_args) {
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
if (r.ok()) return std::move(*r);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
InterceptClientToServerMessageHandler(hook, call_data, call_args));
}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived, typename HookFunction>
inline void InterceptClientToServerMessage(HookFunction hook,
void (Derived::Call::*)(),
FilterCallData<Derived>* call_data,
const CallArgs& call_args) {
call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
InterceptClientToServerMessageHandler(hook, call_data, call_args),
[call_data]() { call_data->call.OnClientToServerHalfClose(); });
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline void InterceptClientToServerMessage(const NoInterceptor*,
const NoInterceptor*,
FilterCallData<Derived>*,
const CallArgs&) {}
template <typename Derived>
inline auto InterceptClientToServerMessageHandler(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
return
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
void (Derived::Call::*fn)(const Message&), typename Derived::Call* call,
Derived*, PipeBasedCallSpine*) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
return [call](MessageHandle msg) -> absl::optional<MessageHandle> {
call->OnClientToServerMessage(*msg);
return std::move(msg);
};
}
template <typename Derived>
inline auto InterceptClientToServerMessageHandler(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
PipeBasedCallSpine* call_spine) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
return [call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
PipeBasedCallSpine* call_spine) {
typename Derived::Call* call, Derived* channel, PipeBasedCallSpine*) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, channel](MessageHandle msg) {
return call->OnClientToServerMessage(std::move(msg), channel);
});
return [call, channel](MessageHandle msg) {
return call->OnClientToServerMessage(std::move(msg), channel);
};
}
template <typename Derived>
inline void InterceptClientToServerMessage(
inline auto InterceptClientToServerMessageHandler(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
PipeBasedCallSpine* call_spine) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
return [call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnClientToServerMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(r.status()));
return absl::nullopt;
};
}
template <typename Derived, typename HookFunction>
inline void InterceptClientToServerMessage(HookFunction fn,
const NoInterceptor*,
typename Derived::Call* call,
Derived* channel,
PipeBasedCallSpine* call_spine) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnClientToServerMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
InterceptClientToServerMessageHandler(fn, call, channel, call_spine));
}
template <typename Derived, typename HookFunction>
inline void InterceptClientToServerMessage(HookFunction fn,
void (Derived::Call::*half_close)(),
typename Derived::Call* call,
Derived* channel,
PipeBasedCallSpine* call_spine) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
DCHECK(half_close == &Derived::Call::OnClientToServerHalfClose);
call_spine->client_to_server_messages().receiver.InterceptAndMapWithHalfClose(
InterceptClientToServerMessageHandler(fn, call, channel, call_spine),
[call]() { call->OnClientToServerHalfClose(); });
}
template <typename Derived>
inline void InterceptClientToServerMessage(const NoInterceptor*,
const NoInterceptor*,
typename Derived::Call*, Derived*,
PipeBasedCallSpine*) {}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
PipeBasedCallSpine*) {}
@ -861,6 +966,18 @@ inline void InterceptServerInitialMetadata(
inline void InterceptServerToClientMessage(const NoInterceptor*, void*,
const CallArgs&) {}
template <typename Derived>
inline void InterceptServerToClientMessage(
void (Derived::Call::*fn)(const Message&),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
DCHECK(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
call_data->call.OnServerToClientMessage(*msg);
return std::move(msg);
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
@ -923,6 +1040,18 @@ inline void InterceptServerToClientMessage(
inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived>
inline void InterceptServerToClientMessage(
void (Derived::Call::*fn)(const Message&), typename Derived::Call* call,
Derived*, PipeBasedCallSpine* call_spine) {
DCHECK(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call](MessageHandle msg) -> absl::optional<MessageHandle> {
call->OnServerToClientMessage(*msg);
return std::move(msg);
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
@ -1120,7 +1249,8 @@ class ImplementChannelFilter : public ChannelFilter,
promise_filter_detail::InterceptClientInitialMetadata(
&Derived::Call::OnClientInitialMetadata, call, d, c);
promise_filter_detail::InterceptClientToServerMessage(
&Derived::Call::OnClientToServerMessage, call, d, c);
&Derived::Call::OnClientToServerMessage,
&Derived::Call::OnClientToServerHalfClose, call, d, c);
promise_filter_detail::InterceptServerInitialMetadata(
&Derived::Call::OnServerInitialMetadata, call, d, c);
promise_filter_detail::InterceptServerToClientMessage(
@ -1139,7 +1269,8 @@ class ImplementChannelFilter : public ChannelFilter,
auto* call = promise_filter_detail::MakeFilterCall<Derived>(
static_cast<Derived*>(this));
promise_filter_detail::InterceptClientToServerMessage(
&Derived::Call::OnClientToServerMessage, call, call_args);
&Derived::Call::OnClientToServerMessage,
&Derived::Call::OnClientToServerHalfClose, call, call_args);
promise_filter_detail::InterceptServerInitialMetadata(
&Derived::Call::OnServerInitialMetadata, call, call_args);
promise_filter_detail::InterceptServerToClientMessage(

@ -41,6 +41,7 @@ TraceFlag grpc_authz_trace(false, "grpc_authz_api");
const NoInterceptor GrpcServerAuthzFilter::Call::OnServerInitialMetadata;
const NoInterceptor GrpcServerAuthzFilter::Call::OnServerTrailingMetadata;
const NoInterceptor GrpcServerAuthzFilter::Call::OnClientToServerMessage;
const NoInterceptor GrpcServerAuthzFilter::Call::OnClientToServerHalfClose;
const NoInterceptor GrpcServerAuthzFilter::Call::OnServerToClientMessage;
const NoInterceptor GrpcServerAuthzFilter::Call::OnFinalize;

@ -51,6 +51,7 @@ class GrpcServerAuthzFilter final
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};

@ -115,6 +115,7 @@ class ServerAuthFilter final : public ImplementChannelFilter<ServerAuthFilter> {
}
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;

@ -68,6 +68,7 @@ const grpc_channel_filter ServerAuthFilter::kFilter =
"server-auth");
const NoInterceptor ServerAuthFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerAuthFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ServerAuthFilter::Call::OnServerToClientMessage;
const NoInterceptor ServerAuthFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerAuthFilter::Call::OnServerTrailingMetadata;

@ -29,6 +29,12 @@ void* Offset(void* base, size_t amt) { return static_cast<char*>(base) + amt; }
namespace filters_detail {
void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data) {
for (const auto& op : ops) {
op.half_close(Offset(call_data, op.call_offset), op.channel_data);
}
}
template <typename T>
OperationExecutor<T>::~OperationExecutor() {
if (promise_data_ != nullptr) {

@ -43,6 +43,7 @@
// - OnServerInitialMetadata - $VALUE_TYPE = ServerMetadata
// - OnServerToClientMessage - $VALUE_TYPE = Message
// - OnClientToServerMessage - $VALUE_TYPE = Message
// - OnClientToServerHalfClose - no value
// - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata
// - OnFinalize - special, see below
// These members define an interception point for a particular event in
@ -192,6 +193,16 @@ struct Operator {
void (*early_destroy)(void* promise_data);
};
struct HalfCloseOperator {
// Pointer to corresponding channel data for this filter
void* channel_data;
// Offset of the call data for this filter within the call data memory
size_t call_offset;
void (*half_close)(void* call_data, void* channel_data);
};
void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data);
// We divide operations into fallible and infallible.
// Fallible operations can fail, and that failure terminates the call.
// Infallible operations cannot fail.
@ -265,6 +276,32 @@ void AddOp(FilterType* channel_data, size_t call_offset,
to);
}
template <typename FilterType>
void AddHalfClose(FilterType* channel_data, size_t call_offset,
void (FilterType::Call::*)(),
std::vector<HalfCloseOperator>& to) {
to.push_back(
HalfCloseOperator{channel_data, call_offset, [](void* call_data, void*) {
static_cast<typename FilterType::Call*>(call_data)
->OnClientToServerHalfClose();
}});
}
template <typename FilterType>
void AddHalfClose(FilterType* channel_data, size_t call_offset,
void (FilterType::Call::*)(FilterType*),
std::vector<HalfCloseOperator>& to) {
to.push_back(HalfCloseOperator{
channel_data, call_offset, [](void* call_data, void* channel_data) {
static_cast<typename FilterType::Call*>(call_data)
->OnClientToServerHalfClose(static_cast<FilterType*>(channel_data));
}});
}
template <typename FilterType>
void AddHalfClose(FilterType*, size_t, const NoInterceptor*,
std::vector<HalfCloseOperator>&) {}
// const NoInterceptor $EVENT
// These do nothing, and specifically DO NOT add an operation to the layout.
// Supported for fallible & infallible operations.
@ -852,6 +889,7 @@ struct StackData {
Layout<FallibleOperator<ClientMetadataHandle>> client_initial_metadata;
Layout<FallibleOperator<ServerMetadataHandle>> server_initial_metadata;
Layout<FallibleOperator<MessageHandle>> client_to_server_messages;
std::vector<HalfCloseOperator> client_to_server_half_close;
Layout<FallibleOperator<MessageHandle>> server_to_client_messages;
Layout<InfallibleOperator<ServerMetadataHandle>> server_trailing_metadata;
// A list of finalizers for this call.
@ -972,6 +1010,14 @@ struct StackData {
channel_data, call_offset, client_to_server_messages);
}
template <typename FilterType>
void AddClientToServerHalfClose(FilterType* channel_data,
size_t call_offset) {
AddHalfClose(channel_data, call_offset,
&FilterType::Call::OnClientToServerHalfClose,
client_to_server_half_close);
}
template <typename FilterType>
void AddServerToClientMessageOp(FilterType* channel_data,
size_t call_offset) {
@ -1217,6 +1263,7 @@ class ServerTrailingMetadataInterceptor {
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
void OnServerTrailingMetadata(ServerMetadata& md,
@ -1240,6 +1287,9 @@ template <typename Fn>
const NoInterceptor
ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerMessage;
template <typename Fn>
const NoInterceptor
ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerHalfClose;
template <typename Fn>
const NoInterceptor
ServerTrailingMetadataInterceptor<Fn>::Call::OnServerToClientMessage;
template <typename Fn>
@ -1256,6 +1306,7 @@ class ClientInitialMetadataInterceptor {
}
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
@ -1273,6 +1324,9 @@ template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerMessage;
template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerHalfClose;
template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnServerToClientMessage;
template <typename Fn>
@ -1319,6 +1373,7 @@ class CallFilters {
data_.AddClientInitialMetadataOp(filter, call_offset);
data_.AddServerInitialMetadataOp(filter, call_offset);
data_.AddClientToServerMessageOp(filter, call_offset);
data_.AddClientToServerHalfClose(filter, call_offset);
data_.AddServerToClientMessageOp(filter, call_offset);
data_.AddServerTrailingMetadataOp(filter, call_offset);
data_.AddFinalizer(filter, call_offset, &FilterType::Call::OnFinalize);
@ -1593,6 +1648,8 @@ class CallFilters {
filters_detail::OperationExecutor<T> executor_;
};
template <std::vector<filters_detail::HalfCloseOperator>(
filters_detail::StackData::*half_close_layout_ptr)>
class PullMessage {
public:
explicit PullMessage(CallFilters* filters) : filters_(filters) {}
@ -1637,7 +1694,14 @@ class CallFilters {
filters_->CancelDueToFailedPipeOperation();
return Failure{};
}
if (!**r) return absl::nullopt;
if (!**r) {
if (half_close_layout_ptr != nullptr) {
filters_detail::RunHalfClose(
filters_->stack_->data_.*half_close_layout_ptr,
filters_->call_data_);
}
return absl::nullopt;
}
CHECK(filters_ != nullptr);
return FinishOperationExecutor(executor_.Start(
layout(), push()->TakeValue(), filters_->call_data_));
@ -1865,7 +1929,8 @@ inline auto CallFilters::PushClientToServerMessage(MessageHandle message) {
}
inline auto CallFilters::PullClientToServerMessage() {
return ClientToServerMessagePromises::PullMessage{this};
return ClientToServerMessagePromises::PullMessage<
&filters_detail::StackData::client_to_server_half_close>{this};
}
inline auto CallFilters::PushServerToClientMessage(MessageHandle message) {
@ -1875,7 +1940,7 @@ inline auto CallFilters::PushServerToClientMessage(MessageHandle message) {
}
inline auto CallFilters::PullServerToClientMessage() {
return ServerToClientMessagePromises::PullMessage{this};
return ServerToClientMessagePromises::PullMessage<nullptr>{this};
}
inline auto CallFilters::PullServerTrailingMetadata() {

@ -41,6 +41,7 @@ namespace grpc_core {
const NoInterceptor ClientLoadReportingFilter::Call::OnServerToClientMessage;
const NoInterceptor ClientLoadReportingFilter::Call::OnClientToServerMessage;
const NoInterceptor ClientLoadReportingFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ClientLoadReportingFilter::Call::OnFinalize;
const grpc_channel_filter ClientLoadReportingFilter::kFilter =

@ -43,6 +43,7 @@ class ClientLoadReportingFilter final
void OnServerTrailingMetadata(ServerMetadata& server_trailing_metadata);
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnFinalize;
private:

@ -330,6 +330,7 @@ class XdsResolver final : public Resolver {
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
@ -383,6 +384,8 @@ const NoInterceptor
XdsResolver::ClusterSelectionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor
XdsResolver::ClusterSelectionFilter::Call::OnClientToServerMessage;
const NoInterceptor
XdsResolver::ClusterSelectionFilter::Call::OnClientToServerHalfClose;
const NoInterceptor
XdsResolver::ClusterSelectionFilter::Call::OnServerToClientMessage;
const NoInterceptor XdsResolver::ClusterSelectionFilter::Call::OnFinalize;

@ -80,6 +80,7 @@ class ServerCallTracerFilter
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
private:
@ -92,6 +93,7 @@ class ServerCallTracerFilter
};
const NoInterceptor ServerCallTracerFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerCallTracerFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ServerCallTracerFilter::Call::OnServerToClientMessage;
const grpc_channel_filter ServerCallTracerFilter::kFilter =

@ -72,6 +72,7 @@ class ServerConfigSelectorFilter final
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
@ -158,6 +159,7 @@ absl::Status ServerConfigSelectorFilter::Call::OnClientInitialMetadata(
const NoInterceptor ServerConfigSelectorFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerConfigSelectorFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerConfigSelectorFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerConfigSelectorFilter::Call::OnClientToServerHalfClose;
const NoInterceptor ServerConfigSelectorFilter::Call::OnServerToClientMessage;
const NoInterceptor ServerConfigSelectorFilter::Call::OnFinalize;

@ -83,6 +83,7 @@ class ServiceConfigChannelArgFilter final
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
@ -97,6 +98,8 @@ const NoInterceptor
ServiceConfigChannelArgFilter::Call::OnServerTrailingMetadata;
const NoInterceptor
ServiceConfigChannelArgFilter::Call::OnClientToServerMessage;
const NoInterceptor
ServiceConfigChannelArgFilter::Call::OnClientToServerHalfClose;
const NoInterceptor
ServiceConfigChannelArgFilter::Call::OnServerToClientMessage;
const NoInterceptor ServiceConfigChannelArgFilter::Call::OnFinalize;

@ -229,6 +229,7 @@ class TestFilter1 {
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
@ -245,6 +246,7 @@ const NoInterceptor TestFilter1::Call::OnClientInitialMetadata;
const NoInterceptor TestFilter1::Call::OnServerInitialMetadata;
const NoInterceptor TestFilter1::Call::OnServerTrailingMetadata;
const NoInterceptor TestFilter1::Call::OnClientToServerMessage;
const NoInterceptor TestFilter1::Call::OnClientToServerHalfClose;
const NoInterceptor TestFilter1::Call::OnServerToClientMessage;
const NoInterceptor TestFilter1::Call::OnFinalize;

@ -1331,6 +1331,7 @@ TEST(CallFiltersTest, CanBuildStack) {
void OnClientInitialMetadata(ClientMetadata&) {}
void OnServerInitialMetadata(ServerMetadata&) {}
void OnClientToServerMessage(Message&) {}
void OnClientToServerHalfClose() {}
void OnServerToClientMessage(Message&) {}
void OnServerTrailingMetadata(ServerMetadata&) {}
void OnFinalize(const grpc_call_final_info*) {}
@ -1355,6 +1356,10 @@ TEST(CallFiltersTest, UnaryCall) {
void OnClientToServerMessage(Message&, Filter* f) {
f->steps.push_back(absl::StrCat(f->label, ":OnClientToServerMessage"));
}
void OnClientToServerHalfClose(Filter* f) {
f->steps.push_back(
absl::StrCat(f->label, ":OnClientToServerHalfClose"));
}
void OnServerToClientMessage(Message&, Filter* f) {
f->steps.push_back(absl::StrCat(f->label, ":OnServerToClientMessage"));
}

@ -81,6 +81,7 @@ class TestFilter {
}
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
@ -101,6 +102,8 @@ const NoInterceptor TestFilter<I>::Call::OnServerInitialMetadata;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnClientToServerMessage;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnClientToServerHalfClose;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnServerToClientMessage;
template <int I>
const NoInterceptor TestFilter<I>::Call::OnServerTrailingMetadata;
@ -118,6 +121,7 @@ class FailsToInstantiateFilter {
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
@ -137,6 +141,9 @@ const NoInterceptor FailsToInstantiateFilter<I>::Call::OnServerInitialMetadata;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnClientToServerMessage;
template <int I>
const NoInterceptor
FailsToInstantiateFilter<I>::Call::OnClientToServerHalfClose;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnServerToClientMessage;
template <int I>
const NoInterceptor FailsToInstantiateFilter<I>::Call::OnServerTrailingMetadata;

@ -57,6 +57,7 @@ grpc_cc_test(
deps = [
":logging_test_library",
"//:grpc++",
"//src/core:dump_args",
"//src/core:logging_filter",
"//src/cpp/ext/gcp:observability_logging_sink",
"//src/proto/grpc/testing:echo_proto",

@ -29,6 +29,7 @@
#include <grpcpp/support/status.h>
#include "src/core/ext/filters/logging/logging_filter.h"
#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/ext/gcp/observability_logging_sink.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -843,7 +844,7 @@ TEST_F(LoggingTest, CancelledRpc) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_LT(absl::Now() - initial_time, absl::Seconds(10));
ASSERT_LT(absl::Now() - initial_time, absl::Seconds(10));
}
}

Loading…
Cancel
Save