[Max Message Limits] Improve logging to determine client/server (#36103)

Will help debugging errors like #35805

Closes #36103

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36103 from yashykt:HelpDebugMessageSizeLimits 5507a257f3
PiperOrigin-RevId: 615564790
pull/36119/head
Yash Tibrewal 11 months ago committed by Copybara-Service
parent 1ce894c977
commit 424e95ccbb
  1. 15
      src/core/ext/filters/http/message_compress/compression_filter.cc
  2. 3
      src/core/ext/filters/http/message_compress/compression_filter.h
  3. 13
      src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
  4. 3
      src/core/ext/filters/http/message_compress/legacy_compression_filter.h
  5. 23
      src/core/ext/filters/message_size/message_size_filter.cc
  6. 2
      src/objective-c/tests/InteropTests/InteropTests.m
  7. 14
      test/core/end2end/tests/max_message_length.cc

@ -169,7 +169,7 @@ MessageHandle ChannelCompression::CompressMessage(
}
absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
MessageHandle message, DecompressArgs args) const {
bool is_client, MessageHandle message, DecompressArgs args) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
message->payload()->Length(),
@ -186,8 +186,9 @@ absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
message->payload()->Length() >
static_cast<size_t>(*args.max_recv_message_length)) {
return absl::ResourceExhaustedError(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
message->payload()->Length(), *args.max_recv_message_length));
"%s: Received message larger than max (%u vs. %d)",
is_client ? "CLIENT" : "SERVER", message->payload()->Length(),
*args.max_recv_message_length));
}
// Check if decompression is enabled (if not, we can just pass the message
// up).
@ -264,8 +265,8 @@ void ClientCompressionFilter::Call::OnServerInitialMetadata(
absl::StatusOr<MessageHandle>
ClientCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter) {
return filter->compression_engine_.DecompressMessage(std::move(message),
decompress_args_);
return filter->compression_engine_.DecompressMessage(
/*is_client=*/true, std::move(message), decompress_args_);
}
void ServerCompressionFilter::Call::OnClientInitialMetadata(
@ -276,8 +277,8 @@ void ServerCompressionFilter::Call::OnClientInitialMetadata(
absl::StatusOr<MessageHandle>
ServerCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ServerCompressionFilter* filter) {
return filter->compression_engine_.DecompressMessage(std::move(message),
decompress_args_);
return filter->compression_engine_.DecompressMessage(
/*is_client=*/false, std::move(message), decompress_args_);
}
void ServerCompressionFilter::Call::OnServerInitialMetadata(

@ -87,7 +87,8 @@ class ChannelCompression {
MessageHandle CompressMessage(MessageHandle message,
grpc_compression_algorithm algorithm) const;
// Decompress one message synchronously.
absl::StatusOr<MessageHandle> DecompressMessage(MessageHandle message,
absl::StatusOr<MessageHandle> DecompressMessage(bool is_client,
MessageHandle message,
DecompressArgs args) const;
private:

@ -166,7 +166,7 @@ MessageHandle LegacyCompressionFilter::CompressMessage(
}
absl::StatusOr<MessageHandle> LegacyCompressionFilter::DecompressMessage(
MessageHandle message, DecompressArgs args) const {
bool is_client, MessageHandle message, DecompressArgs args) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
message->payload()->Length(),
@ -183,8 +183,9 @@ absl::StatusOr<MessageHandle> LegacyCompressionFilter::DecompressMessage(
message->payload()->Length() >
static_cast<size_t>(*args.max_recv_message_length)) {
return absl::ResourceExhaustedError(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
message->payload()->Length(), *args.max_recv_message_length));
"%s: Received message larger than max (%u vs. %d)",
is_client ? "CLIENT" : "SERVER", message->payload()->Length(),
*args.max_recv_message_length));
}
// Check if decompression is enabled (if not, we can just pass the message
// up).
@ -266,7 +267,8 @@ LegacyClientCompressionFilter::MakeCallPromise(
call_args.server_to_client_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), *decompress_args);
auto r = DecompressMessage(/*is_client=*/true, std::move(message),
*decompress_args);
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
@ -288,7 +290,8 @@ LegacyServerCompressionFilter::MakeCallPromise(
call_args.client_to_server_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args);
auto r = DecompressMessage(/*is_client=*/false, std::move(message),
decompress_args);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
GetContext<Activity>()->DebugTag().c_str(),

@ -87,7 +87,8 @@ class LegacyCompressionFilter : public ChannelFilter {
MessageHandle CompressMessage(MessageHandle message,
grpc_compression_algorithm algorithm) const;
// Decompress one message synchronously.
absl::StatusOr<MessageHandle> DecompressMessage(MessageHandle message,
absl::StatusOr<MessageHandle> DecompressMessage(bool is_client,
MessageHandle message,
DecompressArgs args) const;
private:

@ -160,7 +160,7 @@ absl::StatusOr<ServerMessageSizeFilter> ServerMessageSizeFilter::Create(
namespace {
ServerMetadataHandle CheckPayload(const Message& msg,
absl::optional<uint32_t> max_length,
bool is_send) {
bool is_client, bool is_send) {
if (!max_length.has_value()) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_trace)) {
gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d",
@ -170,10 +170,11 @@ ServerMetadataHandle CheckPayload(const Message& msg,
if (msg.payload()->Length() <= *max_length) return nullptr;
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>(GetContext<Arena>());
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(), Slice::FromCopiedString(absl::StrFormat(
"%s message larger than max (%u vs. %d)",
is_send ? "Sent" : "Received",
msg.payload()->Length(), *max_length)));
r->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(absl::StrFormat(
"%s: %s message larger than max (%u vs. %d)",
is_client ? "CLIENT" : "SERVER", is_send ? "Sent" : "Received",
msg.payload()->Length(), *max_length)));
return r;
}
} // namespace
@ -207,22 +208,26 @@ ClientMessageSizeFilter::Call::Call(ClientMessageSizeFilter* filter)
ServerMetadataHandle ServerMessageSizeFilter::Call::OnClientToServerMessage(
const Message& message, ServerMessageSizeFilter* filter) {
return CheckPayload(message, filter->parsed_config_.max_recv_size(), false);
return CheckPayload(message, filter->parsed_config_.max_recv_size(),
/*is_client=*/false, false);
}
ServerMetadataHandle ServerMessageSizeFilter::Call::OnServerToClientMessage(
const Message& message, ServerMessageSizeFilter* filter) {
return CheckPayload(message, filter->parsed_config_.max_send_size(), true);
return CheckPayload(message, filter->parsed_config_.max_send_size(),
/*is_client=*/false, true);
}
ServerMetadataHandle ClientMessageSizeFilter::Call::OnClientToServerMessage(
const Message& message) {
return CheckPayload(message, limits_.max_send_size(), true);
return CheckPayload(message, limits_.max_send_size(), /*is_client=*/true,
true);
}
ServerMetadataHandle ClientMessageSizeFilter::Call::OnServerToClientMessage(
const Message& message) {
return CheckPayload(message, limits_.max_recv_size(), false);
return CheckPayload(message, limits_.max_recv_size(), /*is_client=*/true,
false);
}
namespace {

@ -958,7 +958,7 @@ static dispatch_once_t initGlobalInterceptorFactory;
// https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/field_mask.proto
XCTAssertEqualObjects(
error.localizedDescription,
@"Received message larger than max (4194305 vs. 4194304)");
@"CLIENT: Received message larger than max (4194305 vs. 4194304)");
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);

@ -45,7 +45,8 @@ void TestMaxMessageLengthOnClientOnRequest(CoreEnd2endTest& test) {
test.Expect(1, true);
test.Step();
EXPECT_EQ(server_status.status(), GRPC_STATUS_RESOURCE_EXHAUSTED);
EXPECT_EQ(server_status.message(), "Sent message larger than max (11 vs. 5)");
EXPECT_EQ(server_status.message(),
"CLIENT: Sent message larger than max (11 vs. 5)");
}
void TestMaxMessageLengthOnServerOnRequest(CoreEnd2endTest& test) {
@ -71,7 +72,7 @@ void TestMaxMessageLengthOnServerOnRequest(CoreEnd2endTest& test) {
EXPECT_TRUE(client_close.was_cancelled());
EXPECT_EQ(server_status.status(), GRPC_STATUS_RESOURCE_EXHAUSTED);
EXPECT_EQ(server_status.message(),
"Received message larger than max (11 vs. 5)");
"SERVER: Received message larger than max (11 vs. 5)");
}
void TestMaxMessageLengthOnClientOnResponse(CoreEnd2endTest& test) {
@ -100,7 +101,7 @@ void TestMaxMessageLengthOnClientOnResponse(CoreEnd2endTest& test) {
EXPECT_EQ(s.method(), "/service/method");
EXPECT_EQ(server_status.status(), GRPC_STATUS_RESOURCE_EXHAUSTED);
EXPECT_EQ(server_status.message(),
"Received message larger than max (11 vs. 5)");
"CLIENT: Received message larger than max (11 vs. 5)");
}
void TestMaxMessageLengthOnServerOnResponse(CoreEnd2endTest& test) {
@ -128,7 +129,8 @@ void TestMaxMessageLengthOnServerOnResponse(CoreEnd2endTest& test) {
test.Step();
EXPECT_EQ(s.method(), "/service/method");
EXPECT_EQ(server_status.status(), GRPC_STATUS_RESOURCE_EXHAUSTED);
EXPECT_EQ(server_status.message(), "Sent message larger than max (11 vs. 5)");
EXPECT_EQ(server_status.message(),
"SERVER: Sent message larger than max (11 vs. 5)");
}
CORE_END2END_TEST(CoreEnd2endTest,
@ -276,7 +278,7 @@ CORE_END2END_TEST(Http2Test, MaxMessageLengthOnServerOnRequestWithCompression) {
EXPECT_TRUE(client_close.was_cancelled());
EXPECT_EQ(server_status.status(), GRPC_STATUS_RESOURCE_EXHAUSTED);
EXPECT_THAT(server_status.message(),
StartsWith("Received message larger than max"));
StartsWith("SERVER: Received message larger than max"));
}
CORE_END2END_TEST(Http2Test,
@ -311,7 +313,7 @@ CORE_END2END_TEST(Http2Test,
EXPECT_EQ(s.method(), "/service/method");
EXPECT_EQ(server_status.status(), GRPC_STATUS_RESOURCE_EXHAUSTED);
EXPECT_THAT(server_status.message(),
StartsWith("Received message larger than max"));
StartsWith("CLIENT: Received message larger than max"));
}
} // namespace

Loading…
Cancel
Save