pull/36598/head
Craig Tiller 11 months ago
parent 881fc8ef62
commit ddc6127e27
  1. 1
      BUILD
  2. 13
      src/core/ext/filters/logging/logging_filter.cc
  3. 45
      src/core/ext/filters/logging/logging_sink.h
  4. 68
      src/core/lib/channel/promise_based_filter.h
  5. 1
      test/cpp/ext/filters/logging/BUILD
  6. 3
      test/cpp/ext/filters/logging/logging_test.cc

@ -2077,6 +2077,7 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:cancel_callback",
"//src/core:call_destination",
"//src/core:call_filters",
"//src/core:call_final_info",

@ -290,6 +290,7 @@ void CallData::LogServerTrailer(bool is_client,
}
g_logging_sink->LogEntry(std::move(entry));
}
void CallData::LogClientMessage(bool is_client,
CallTracerAnnotationInterface* tracer,
const SliceBuffer* message) {
@ -378,6 +379,12 @@ void ClientLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
void ClientLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
if (!call_data_.has_value()) return;
if (md.get(GrpcCallWasCancelled()).value_or(false) &&
md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
call_data_->LogCancel(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>());
return;
}
call_data_->LogServerTrailer(
/*is_client=*/true, GetContext<CallTracerAnnotationInterface>(), &md);
}
@ -436,6 +443,12 @@ void ServerLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
void ServerLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
if (!call_data_.has_value()) return;
if (md.get(GrpcCallWasCancelled()).value_or(false) &&
md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
call_data_->LogCancel(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>());
return;
}
call_data_->LogServerTrailer(
/*is_client=*/false, GetContext<CallTracerAnnotationInterface>(), &md);
}

@ -27,6 +27,7 @@
#include <string>
#include "absl/numeric/int128.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/gprpp/time.h"
@ -73,8 +74,42 @@ class LoggingSink {
kCancel
};
static std::string EventTypeString(EventType type) {
switch (type) {
case EventType::kUnknown:
return "UNKNOWN";
case EventType::kClientHeader:
return "CLIENT_HEADER";
case EventType::kServerHeader:
return "SERVER_HEADER";
case EventType::kClientMessage:
return "CLIENT_MESSAGE";
case EventType::kServerMessage:
return "SERVER_MESSAGE";
case EventType::kClientHalfClose:
return "CLIENT_HALF_CLOSE";
case EventType::kServerTrailer:
return "SERVER_TRAILER";
case EventType::kCancel:
return "CANCEL";
}
return absl::StrCat("INVALID(", static_cast<int>(type), ")");
}
enum class Logger { kUnknown = 0, kClient, kServer };
static std::string LoggerString(Logger logger) {
switch (logger) {
case Logger::kUnknown:
return "UNKNOWN";
case Logger::kClient:
return "CLIENT";
case Logger::kServer:
return "SERVER";
}
return absl::StrCat("INVALID(", static_cast<int>(logger), ")");
}
struct Payload {
std::map<std::string, std::string> metadata;
Duration timeout;
@ -118,6 +153,16 @@ class LoggingSink {
virtual void LogEntry(Entry entry) = 0;
};
inline std::ostream& operator<<(std::ostream& out,
const LoggingSink::Entry::EventType& type) {
return out << LoggingSink::Entry::EventTypeString(type);
}
inline std::ostream& operator<<(std::ostream& out,
const LoggingSink::Entry::Logger& logger) {
return out << LoggingSink::Entry::LoggerString(logger);
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_LOGGING_LOGGING_SINK_H

@ -59,6 +59,7 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
@ -354,31 +355,72 @@ template <typename Promise, typename Derived>
auto MapResult(absl::Status (Derived::Call::*fn)(ServerMetadata&), Promise x,
FilterCallData<Derived>* call_data) {
DCHECK(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
auto status = call_data->call.OnServerTrailingMetadata(*md);
if (!status.ok()) return ServerMetadataFromStatus(status);
return md;
});
return OnCancel(
Map(std::move(x),
[call_data](ServerMetadataHandle md) {
auto status = call_data->call.OnServerTrailingMetadata(*md);
if (!status.ok()) {
return ServerMetadataFromStatus(status);
}
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[call_data, ctx = GetContext<grpc_call_context_element>()]() {
grpc_metadata_batch b;
b.Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
b.Set(GrpcCallWasCancelled(), true);
promise_detail::Context<grpc_call_context_element> context(ctx);
call_data->call.OnServerTrailingMetadata(b).IgnoreError();
});
}
template <typename Promise, typename Derived>
auto MapResult(void (Derived::Call::*fn)(ServerMetadata&), Promise x,
FilterCallData<Derived>* call_data) {
DCHECK(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md);
return md;
});
return OnCancel(
Map(std::move(x),
[call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md);
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[call_data, ctx = GetContext<grpc_call_context_element>()]() {
grpc_metadata_batch b;
b.Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
b.Set(GrpcCallWasCancelled(), true);
promise_detail::Context<grpc_call_context_element> context(ctx);
call_data->call.OnServerTrailingMetadata(b);
});
}
template <typename Promise, typename Derived>
auto MapResult(void (Derived::Call::*fn)(ServerMetadata&, Derived*), Promise x,
FilterCallData<Derived>* call_data) {
DCHECK(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md, call_data->channel);
return md;
});
return OnCancel(
Map(std::move(x),
[call_data](ServerMetadataHandle md) {
call_data->call.OnServerTrailingMetadata(*md, call_data->channel);
return md;
}),
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[call_data, ctx = GetContext<grpc_call_context_element>()]() {
grpc_metadata_batch b;
b.Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
b.Set(GrpcCallWasCancelled(), true);
promise_detail::Context<grpc_call_context_element> context(ctx);
call_data->call.OnServerTrailingMetadata(b, call_data->channel);
});
}
template <typename Interceptor, typename Derived, typename SfinaeVoid = void>

@ -64,6 +64,7 @@ grpc_cc_test(
"//test/cpp/end2end:test_service_impl",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
"//src/core:dump_args",
],
)

@ -29,6 +29,7 @@
#include <grpcpp/support/status.h>
#include "src/core/ext/filters/logging/logging_filter.h"
#include "src/core/lib/gprpp/dump_args.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/ext/gcp/observability_logging_sink.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -843,7 +844,7 @@ TEST_F(LoggingTest, CancelledRpc) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_LT(absl::Now() - initial_time, absl::Seconds(10));
ASSERT_LT(absl::Now() - initial_time, absl::Seconds(10));
}
}

Loading…
Cancel
Save