[promises] Finishing off the server stack (#32158)

To be merged after #31448 #32110 #32094 

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/32345/head
Craig Tiller 2 years ago committed by GitHub
parent 033d55ffd3
commit 98caaaefbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 2
      build_autogenerated.yaml
  3. 15
      src/core/BUILD
  4. 6
      src/core/ext/filters/http/client/http_client_filter.cc
  5. 13
      src/core/ext/filters/http/message_compress/compression_filter.cc
  6. 365
      src/core/ext/filters/message_size/message_size_filter.cc
  7. 51
      src/core/ext/filters/message_size/message_size_filter.h
  8. 2
      src/core/ext/transport/inproc/inproc_transport.cc
  9. 166
      src/core/lib/channel/connected_channel.cc
  10. 87
      src/core/lib/channel/promise_based_filter.cc
  11. 12
      src/core/lib/channel/promise_based_filter.h
  12. 4
      src/core/lib/iomgr/call_combiner.h
  13. 4
      src/core/lib/promise/detail/promise_factory.h
  14. 3
      src/core/lib/promise/interceptor_list.h
  15. 4
      src/core/lib/promise/latch.h
  16. 22
      src/core/lib/security/transport/server_auth_filter.cc
  17. 2
      src/core/lib/slice/slice.cc
  18. 676
      src/core/lib/surface/call.cc
  19. 1
      src/core/lib/surface/lame_client.cc
  20. 12
      src/core/lib/transport/metadata_batch.h
  21. 67
      src/core/lib/transport/transport.h
  22. 8
      test/core/end2end/fixtures/h2_oauth2_tls12.cc
  23. 8
      test/core/end2end/fixtures/h2_oauth2_tls13.cc
  24. 2
      test/core/end2end/fixtures/proxy.cc
  25. 27
      test/core/end2end/tests/filter_init_fails.cc
  26. 20
      test/core/end2end/tests/max_message_length.cc
  27. 12
      test/core/end2end/tests/streaming_error_response.cc
  28. 6
      test/core/filters/client_auth_filter_test.cc
  29. 6
      test/core/filters/client_authority_filter_test.cc
  30. 1
      test/core/filters/filter_fuzzer.cc

@ -1471,6 +1471,7 @@ grpc_cc_library(
"//src/core:iomgr_fwd",
"//src/core:iomgr_port",
"//src/core:json",
"//src/core:latch",
"//src/core:map",
"//src/core:match",
"//src/core:memory_quota",

@ -3730,6 +3730,7 @@ libs:
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h
@ -7544,6 +7545,7 @@ targets:
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h

@ -3437,33 +3437,38 @@ grpc_cc_library(
"ext/filters/message_size/message_size_filter.h",
],
external_deps = [
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
],
language = "c++",
deps = [
"activity",
"arena",
"arena_promise",
"channel_args",
"channel_fwd",
"channel_init",
"channel_stack_type",
"closure",
"error",
"context",
"grpc_service_config",
"json",
"json_args",
"json_object_loader",
"latch",
"poll",
"race",
"service_config_parser",
"slice",
"slice_buffer",
"status_helper",
"validation_errors",
"//:channel_stack_builder",
"//:config",
"//:debug_location",
"//:gpr",
"//:grpc_base",
"//:grpc_public_hdrs",
"//:grpc_trace",
],
)

@ -133,13 +133,13 @@ ArenaPromise<ServerMetadataHandle> HttpClientFilter::MakeCallPromise(
return std::move(md);
});
return Race(Map(next_promise_factory(std::move(call_args)),
return Race(initial_metadata_err->Wait(),
Map(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle {
auto r = CheckServerMetadata(md.get());
if (!r.ok()) return ServerMetadataFromStatus(r);
return md;
}),
initial_metadata_err->Wait());
}));
}
HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme,

@ -233,7 +233,7 @@ ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
return CompressMessage(std::move(message), compression_algorithm);
});
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>(
DecompressArgs{GRPC_COMPRESS_NONE, absl::nullopt});
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt});
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap(
@ -254,8 +254,8 @@ ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
return Race(next_promise_factory(std::move(call_args)),
decompress_err->Wait());
return Race(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
@ -269,7 +269,8 @@ ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "DecompressMessage returned %s",
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str());
}
if (!r.ok()) {
@ -300,8 +301,8 @@ ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
// - decompress incoming messages
// - wait for initial metadata to be sent, and then commence compression of
// outgoing messages
return Race(next_promise_factory(std::move(call_args)),
decompress_err->Wait());
return Race(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
} // namespace grpc_core

@ -18,10 +18,13 @@
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include <inttypes.h>
#include <functional>
#include <initializer_list>
#include <new>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/grpc.h>
@ -32,21 +35,22 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
static void recv_message_ready(void* user_data, grpc_error_handle error);
static void recv_trailing_metadata_ready(void* user_data,
grpc_error_handle error);
namespace grpc_core {
//
@ -124,251 +128,164 @@ size_t MessageSizeParser::ParserIndex() {
parser_name());
}
} // namespace grpc_core
namespace {
struct channel_data {
grpc_core::MessageSizeParsedConfig limits;
const size_t service_config_parser_index{
grpc_core::MessageSizeParser::ParserIndex()};
};
//
// MessageSizeFilter
//
struct call_data {
call_data(grpc_call_element* elem, const channel_data& chand,
const grpc_call_element_args& args)
: call_combiner(args.call_combiner), limits(chand.limits) {
GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
::recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
// size to the receive limit.
const grpc_core::MessageSizeParsedConfig* config_from_call_context =
grpc_core::MessageSizeParsedConfig::GetFromCallContext(
args.context, chand.service_config_parser_index);
if (config_from_call_context != nullptr) {
absl::optional<uint32_t> max_send_size = limits.max_send_size();
absl::optional<uint32_t> max_recv_size = limits.max_recv_size();
if (config_from_call_context->max_send_size().has_value() &&
(!max_send_size.has_value() ||
*config_from_call_context->max_send_size() < *max_send_size)) {
max_send_size = *config_from_call_context->max_send_size();
const grpc_channel_filter ClientMessageSizeFilter::kFilter =
MakePromiseBasedFilter<ClientMessageSizeFilter, FilterEndpoint::kClient,
kFilterExaminesOutboundMessages |
kFilterExaminesInboundMessages>("message_size");
const grpc_channel_filter ServerMessageSizeFilter::kFilter =
MakePromiseBasedFilter<ServerMessageSizeFilter, FilterEndpoint::kServer,
kFilterExaminesOutboundMessages |
kFilterExaminesInboundMessages>("message_size");
class MessageSizeFilter::CallBuilder {
private:
auto Interceptor(uint32_t max_length, bool is_send) {
return [max_length, is_send,
err = err_](MessageHandle msg) -> absl::optional<MessageHandle> {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d",
Activity::current()->DebugTag().c_str(),
is_send ? "send" : "recv", msg->payload()->Length(),
max_length);
}
if (config_from_call_context->max_recv_size().has_value() &&
(!max_recv_size.has_value() ||
*config_from_call_context->max_recv_size() < *max_recv_size)) {
max_recv_size = *config_from_call_context->max_recv_size();
if (msg->payload()->Length() > max_length) {
if (err->is_set()) return std::move(msg);
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>(
GetContext<Arena>());
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(
absl::StrFormat("%s message larger than max (%u vs. %d)",
is_send ? "Sent" : "Received",
msg->payload()->Length(), max_length)));
err->Set(std::move(r));
return absl::nullopt;
}
limits = grpc_core::MessageSizeParsedConfig(max_send_size, max_recv_size);
}
return std::move(msg);
};
}
~call_data() {}
grpc_core::CallCombiner* call_combiner;
grpc_core::MessageSizeParsedConfig limits;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
grpc_closure recv_trailing_metadata_ready;
// The error caused by a message that is too large, or absl::OkStatus()
grpc_error_handle error;
// Used by recv_message_ready.
absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready = nullptr;
// Original recv_trailing_metadata callback, invoked after our own.
grpc_closure* original_recv_trailing_metadata_ready;
bool seen_recv_trailing_metadata = false;
grpc_error_handle recv_trailing_metadata_error;
};
} // namespace
public:
explicit CallBuilder(const MessageSizeParsedConfig& limits)
: limits_(limits) {}
// Callback invoked when we receive a message. Here we check the max
// receive message size.
static void recv_message_ready(void* user_data, grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->recv_message->has_value() &&
calld->limits.max_recv_size().has_value() &&
(*calld->recv_message)->Length() >
static_cast<size_t>(*calld->limits.max_recv_size())) {
grpc_error_handle new_error = grpc_error_set_int(
GRPC_ERROR_CREATE(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->Length(), *calld->limits.max_recv_size())),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_RESOURCE_EXHAUSTED);
error = grpc_error_add_child(error, new_error);
calld->error = error;
template <typename T>
void AddSend(T* pipe_end) {
if (!limits_.max_send_size().has_value()) return;
pipe_end->InterceptAndMap(Interceptor(*limits_.max_send_size(), true));
}
// Invoke the next callback.
grpc_closure* closure = calld->next_recv_message_ready;
calld->next_recv_message_ready = nullptr;
if (calld->seen_recv_trailing_metadata) {
// We might potentially see another RECV_MESSAGE op. In that case, we do not
// want to run the recv_trailing_metadata_ready closure again. The newer
// RECV_MESSAGE op cannot cause any errors since the transport has already
// invoked the recv_trailing_metadata_ready closure and all further
// RECV_MESSAGE ops will get null payloads.
calld->seen_recv_trailing_metadata = false;
GRPC_CALL_COMBINER_START(calld->call_combiner,
&calld->recv_trailing_metadata_ready,
calld->recv_trailing_metadata_error,
"continue recv_trailing_metadata_ready");
template <typename T>
void AddRecv(T* pipe_end) {
if (!limits_.max_recv_size().has_value()) return;
pipe_end->InterceptAndMap(Interceptor(*limits_.max_recv_size(), false));
}
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
}
// Callback invoked on completion of recv_trailing_metadata
// Notifies the recv_trailing_metadata batch of any message size failures
static void recv_trailing_metadata_ready(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->next_recv_message_ready != nullptr) {
calld->seen_recv_trailing_metadata = true;
calld->recv_trailing_metadata_error = error;
GRPC_CALL_COMBINER_STOP(calld->call_combiner,
"deferring recv_trailing_metadata_ready until "
"after recv_message_ready");
return;
ArenaPromise<ServerMetadataHandle> Run(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
return Race(err_->Wait(), next_promise_factory(std::move(call_args)));
}
error = grpc_error_add_child(error, calld->error);
// Invoke the next callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, error);
}
// Start transport stream op.
static void message_size_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// Check max send message size.
if (op->send_message && calld->limits.max_send_size().has_value() &&
op->payload->send_message.send_message->Length() >
static_cast<size_t>(*calld->limits.max_send_size())) {
grpc_transport_stream_op_batch_finish_with_failure(
op,
grpc_error_set_int(GRPC_ERROR_CREATE(absl::StrFormat(
"Sent message larger than max (%u vs. %d)",
op->payload->send_message.send_message->Length(),
*calld->limits.max_send_size())),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_RESOURCE_EXHAUSTED),
calld->call_combiner);
return;
}
// Inject callback for receiving a message.
if (op->recv_message) {
calld->next_recv_message_ready =
op->payload->recv_message.recv_message_ready;
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
// Inject callback for receiving trailing metadata.
if (op->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
// Chain to the next filter.
grpc_call_next_op(elem, op);
}
private:
Latch<ServerMetadataHandle>* const err_ =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
MessageSizeParsedConfig limits_;
};
// Constructor for call_data.
static grpc_error_handle message_size_init_call_elem(
grpc_call_element* elem, const grpc_call_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (elem->call_data) call_data(elem, *chand, *args);
return absl::OkStatus();
absl::StatusOr<ClientMessageSizeFilter> ClientMessageSizeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args) {
return ClientMessageSizeFilter(args);
}
// Destructor for call_data.
static void message_size_destroy_call_elem(
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->~call_data();
absl::StatusOr<ServerMessageSizeFilter> ServerMessageSizeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args) {
return ServerMessageSizeFilter(args);
}
// Constructor for channel_data.
static grpc_error_handle message_size_init_channel_elem(
grpc_channel_element* elem, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
chand->limits = grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(
args->channel_args);
return absl::OkStatus();
}
ArenaPromise<ServerMetadataHandle> ClientMessageSizeFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
// size to the receive limit.
MessageSizeParsedConfig limits = this->limits();
const MessageSizeParsedConfig* config_from_call_context =
MessageSizeParsedConfig::GetFromCallContext(
GetContext<grpc_call_context_element>(),
service_config_parser_index_);
if (config_from_call_context != nullptr) {
absl::optional<uint32_t> max_send_size = limits.max_send_size();
absl::optional<uint32_t> max_recv_size = limits.max_recv_size();
if (config_from_call_context->max_send_size().has_value() &&
(!max_send_size.has_value() ||
*config_from_call_context->max_send_size() < *max_send_size)) {
max_send_size = *config_from_call_context->max_send_size();
}
if (config_from_call_context->max_recv_size().has_value() &&
(!max_recv_size.has_value() ||
*config_from_call_context->max_recv_size() < *max_recv_size)) {
max_recv_size = *config_from_call_context->max_recv_size();
}
limits = MessageSizeParsedConfig(max_send_size, max_recv_size);
}
// Destructor for channel_data.
static void message_size_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
chand->~channel_data();
CallBuilder b(limits);
b.AddSend(call_args.client_to_server_messages);
b.AddRecv(call_args.server_to_client_messages);
return b.Run(std::move(call_args), std::move(next_promise_factory));
}
const grpc_channel_filter grpc_message_size_filter = {
message_size_start_transport_stream_op_batch,
nullptr,
grpc_channel_next_op,
sizeof(call_data),
message_size_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
message_size_destroy_call_elem,
sizeof(channel_data),
message_size_init_channel_elem,
grpc_channel_stack_no_post_init,
message_size_destroy_channel_elem,
grpc_channel_next_get_info,
"message_size"};
ArenaPromise<ServerMetadataHandle> ServerMessageSizeFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
CallBuilder b(limits());
b.AddSend(call_args.server_to_client_messages);
b.AddRecv(call_args.client_to_server_messages);
return b.Run(std::move(call_args), std::move(next_promise_factory));
}
namespace {
// Used for GRPC_CLIENT_SUBCHANNEL
static bool maybe_add_message_size_filter_subchannel(
grpc_core::ChannelStackBuilder* builder) {
bool MaybeAddMessageSizeFilterToSubchannel(ChannelStackBuilder* builder) {
if (builder->channel_args().WantMinimalStack()) {
return true;
}
builder->PrependFilter(&grpc_message_size_filter);
builder->PrependFilter(&ClientMessageSizeFilter::kFilter);
return true;
}
// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the filter
// only if message size limits or service config is specified.
static bool maybe_add_message_size_filter(
grpc_core::ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack()) {
// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the
// filter only if message size limits or service config is specified.
auto MaybeAddMessageSizeFilter(const grpc_channel_filter* filter) {
return [filter](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack()) {
return true;
}
MessageSizeParsedConfig limits =
MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
const bool enable =
limits.max_send_size().has_value() ||
limits.max_recv_size().has_value() ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
if (enable) builder->PrependFilter(filter);
return true;
}
grpc_core::MessageSizeParsedConfig limits =
grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
const bool enable =
limits.max_send_size().has_value() ||
limits.max_recv_size().has_value() ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
if (enable) builder->PrependFilter(&grpc_message_size_filter);
return true;
};
}
namespace grpc_core {
} // namespace
void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
MessageSizeParser::Register(builder);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter_subchannel);
builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter);
builder->channel_init()->RegisterStage(GRPC_SERVER_CHANNEL,
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter);
MaybeAddMessageSizeFilterToSubchannel);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilter(&ClientMessageSizeFilter::kFilter));
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilter(&ServerMessageSizeFilter::kFilter));
}
} // namespace grpc_core

@ -24,21 +24,22 @@
#include <memory>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/service_config/service_config_parser.h"
extern const grpc_channel_filter grpc_message_size_filter;
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
@ -85,6 +86,50 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
absl::optional<uint32_t> GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args);
absl::optional<uint32_t> GetMaxSendSizeFromChannelArgs(const ChannelArgs& args);
class MessageSizeFilter : public ChannelFilter {
protected:
explicit MessageSizeFilter(const ChannelArgs& args)
: limits_(MessageSizeParsedConfig::GetFromChannelArgs(args)) {}
class CallBuilder;
const MessageSizeParsedConfig& limits() const { return limits_; }
private:
MessageSizeParsedConfig limits_;
};
class ServerMessageSizeFilter final : public MessageSizeFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerMessageSizeFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using MessageSizeFilter::MessageSizeFilter;
};
class ClientMessageSizeFilter final : public MessageSizeFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientMessageSizeFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
const size_t service_config_parser_index_{MessageSizeParser::ParserIndex()};
using MessageSizeFilter::MessageSizeFilter;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H

@ -766,6 +766,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
nullptr);
s->to_read_trailing_md.Clear();
s->to_read_trailing_md_filled = false;
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
// We should schedule the recv_trailing_md_op completion if
// 1. this stream is the client-side

@ -56,6 +56,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
@ -73,6 +74,7 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_fwd.h"
@ -478,7 +480,15 @@ class ConnectedChannelStream : public Orphanable {
return Match(
recv_message_state_, [](Idle) -> std::string { return "IDLE"; },
[](Closed) -> std::string { return "CLOSED"; },
[](const PendingReceiveMessage&) -> std::string { return "WAITING"; },
[](const PendingReceiveMessage& m) -> std::string {
if (m.received) {
return absl::StrCat("RECEIVED_FROM_TRANSPORT:",
m.payload.has_value()
? absl::StrCat(m.payload->Length(), "b")
: "EOS");
}
return "WAITING";
},
[](const absl::optional<MessageHandle>& message) -> std::string {
return absl::StrCat(
"READY:", message.has_value()
@ -570,13 +580,7 @@ class ConnectedChannelStream : public Orphanable {
void RecvMessageBatchDone(grpc_error_handle error) {
{
MutexLock lock(mu());
if (error != absl::OkStatus()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[connected] RecvMessageBatchDone: error=%s",
recv_message_waker_.ActivityDebugTag().c_str(),
StatusToString(error).c_str());
}
} else if (absl::holds_alternative<Closed>(recv_message_state_)) {
if (absl::holds_alternative<Closed>(recv_message_state_)) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[connected] RecvMessageBatchDone: already closed, "
@ -584,14 +588,21 @@ class ConnectedChannelStream : public Orphanable {
recv_message_waker_.ActivityDebugTag().c_str());
}
} else {
if (grpc_call_trace.enabled()) {
auto pending =
absl::get_if<PendingReceiveMessage>(&recv_message_state_);
GPR_ASSERT(pending != nullptr);
if (!error.ok()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[connected] RecvMessageBatchDone: error=%s",
recv_message_waker_.ActivityDebugTag().c_str(),
StatusToString(error).c_str());
}
pending->payload.reset();
} else if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[connected] RecvMessageBatchDone: received message",
recv_message_waker_.ActivityDebugTag().c_str());
}
auto pending =
absl::get_if<PendingReceiveMessage>(&recv_message_state_);
GPR_ASSERT(pending != nullptr);
GPR_ASSERT(pending->received == false);
pending->received = true;
}
@ -671,6 +682,8 @@ class ClientStream : public ConnectedChannelStream {
public:
ClientStream(grpc_transport* transport, CallArgs call_args)
: ConnectedChannelStream(transport),
client_initial_metadata_outstanding_token_(
std::move(call_args.client_initial_metadata_outstanding)),
server_initial_metadata_pipe_(call_args.server_initial_metadata),
client_to_server_messages_(call_args.client_to_server_messages),
server_to_client_messages_(call_args.server_to_client_messages),
@ -704,12 +717,14 @@ class ClientStream : public ConnectedChannelStream {
nullptr, GetContext<Arena>());
grpc_transport_set_pops(transport(), stream(),
GetContext<CallContext>()->polling_entity());
memset(&metadata_, 0, sizeof(metadata_));
metadata_.send_initial_metadata = true;
metadata_.recv_initial_metadata = true;
metadata_.recv_trailing_metadata = true;
metadata_.payload = batch_payload();
metadata_.on_complete = &metadata_batch_done_;
memset(&send_metadata_, 0, sizeof(send_metadata_));
memset(&recv_metadata_, 0, sizeof(recv_metadata_));
send_metadata_.send_initial_metadata = true;
recv_metadata_.recv_initial_metadata = true;
recv_metadata_.recv_trailing_metadata = true;
send_metadata_.payload = batch_payload();
recv_metadata_.payload = batch_payload();
send_metadata_.on_complete = &send_metadata_batch_done_;
batch_payload()->send_initial_metadata.send_initial_metadata =
client_initial_metadata_.get();
batch_payload()->send_initial_metadata.peer_string =
@ -734,9 +749,16 @@ class ClientStream : public ConnectedChannelStream {
IncrementRefCount("metadata_batch_done");
IncrementRefCount("initial_metadata_ready");
IncrementRefCount("trailing_metadata_ready");
initial_metadata_waker_ = Activity::current()->MakeOwningWaker();
trailing_metadata_waker_ = Activity::current()->MakeOwningWaker();
SchedulePush(&metadata_);
recv_initial_metadata_waker_ = Activity::current()->MakeOwningWaker();
recv_trailing_metadata_waker_ = Activity::current()->MakeOwningWaker();
send_initial_metadata_waker_ = Activity::current()->MakeOwningWaker();
SchedulePush(&send_metadata_);
SchedulePush(&recv_metadata_);
}
if (std::exchange(need_to_clear_client_initial_metadata_outstanding_token_,
false)) {
client_initial_metadata_outstanding_token_.Complete(
client_initial_metadata_send_result_);
}
if (server_initial_metadata_state_ ==
ServerInitialMetadataState::kReceivedButNotPushed) {
@ -753,9 +775,21 @@ class ClientStream : public ConnectedChannelStream {
server_initial_metadata_push_promise_.reset();
}
}
if (server_initial_metadata_state_ == ServerInitialMetadataState::kError) {
server_initial_metadata_pipe_->Close();
}
PollSendMessage(client_to_server_messages_, &client_trailing_metadata_);
PollRecvMessage(server_to_client_messages_);
if (server_initial_metadata_state_ == ServerInitialMetadataState::kPushed &&
if (grpc_call_trace.enabled()) {
gpr_log(
GPR_INFO,
"%s[connected] Finishing PollConnectedChannel: requesting metadata",
Activity::current()->DebugTag().c_str());
}
if ((server_initial_metadata_state_ ==
ServerInitialMetadataState::kPushed ||
server_initial_metadata_state_ ==
ServerInitialMetadataState::kError) &&
!IsPromiseReceiving() &&
std::exchange(queued_trailing_metadata_, false)) {
if (grpc_call_trace.enabled()) {
@ -774,18 +808,32 @@ class ClientStream : public ConnectedChannelStream {
}
void RecvInitialMetadataReady(grpc_error_handle error) {
GPR_ASSERT(error == absl::OkStatus());
{
MutexLock lock(mu());
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] RecvInitialMetadataReady: error=%s",
recv_initial_metadata_waker_.ActivityDebugTag().c_str(),
error.ToString().c_str());
}
server_initial_metadata_state_ =
ServerInitialMetadataState::kReceivedButNotPushed;
initial_metadata_waker_.Wakeup();
error.ok() ? ServerInitialMetadataState::kReceivedButNotPushed
: ServerInitialMetadataState::kError;
recv_initial_metadata_waker_.Wakeup();
}
Unref("initial_metadata_ready");
}
void RecvTrailingMetadataReady(grpc_error_handle error) {
GPR_ASSERT(error == absl::OkStatus());
if (!error.ok()) {
server_trailing_metadata_->Clear();
grpc_status_code status = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(error, Timestamp::InfFuture(), &status, &message,
nullptr, nullptr);
server_trailing_metadata_->Set(GrpcStatusMetadata(), status);
server_trailing_metadata_->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(message));
}
{
MutexLock lock(mu());
queued_trailing_metadata_ = true;
@ -794,16 +842,21 @@ class ClientStream : public ConnectedChannelStream {
"%s[connected] RecvTrailingMetadataReady: "
"queued_trailing_metadata_ "
"set to true; active_ops: %s",
trailing_metadata_waker_.ActivityDebugTag().c_str(),
recv_trailing_metadata_waker_.ActivityDebugTag().c_str(),
ActiveOpsString().c_str());
}
trailing_metadata_waker_.Wakeup();
recv_trailing_metadata_waker_.Wakeup();
}
Unref("trailing_metadata_ready");
}
void MetadataBatchDone(grpc_error_handle error) {
GPR_ASSERT(error == absl::OkStatus());
void SendMetadataBatchDone(grpc_error_handle error) {
{
MutexLock lock(mu());
need_to_clear_client_initial_metadata_outstanding_token_ = true;
client_initial_metadata_send_result_ = error.ok();
send_initial_metadata_waker_.Wakeup();
}
Unref("metadata_batch_done");
}
@ -823,6 +876,8 @@ class ClientStream : public ConnectedChannelStream {
// has been pushed on the pipe to publish it up the call stack AND removed
// by the call at the top.
kPushed,
// Received initial metadata with an error status.
kError,
};
std::string ActiveOpsString() const override
@ -831,10 +886,10 @@ class ClientStream : public ConnectedChannelStream {
if (finished()) ops.push_back("FINISHED");
// Outstanding Operations on Transport
std::vector<std::string> waiting;
if (initial_metadata_waker_ != Waker()) {
if (recv_initial_metadata_waker_ != Waker()) {
waiting.push_back("initial_metadata");
}
if (trailing_metadata_waker_ != Waker()) {
if (recv_trailing_metadata_waker_ != Waker()) {
waiting.push_back("trailing_metadata");
}
if (!waiting.empty()) {
@ -850,6 +905,18 @@ class ClientStream : public ConnectedChannelStream {
if (!queued.empty()) {
ops.push_back(absl::StrCat("queued:", absl::StrJoin(queued, ",")));
}
switch (server_initial_metadata_state_) {
case ServerInitialMetadataState::kNotReceived:
case ServerInitialMetadataState::kReceivedButNotPushed:
case ServerInitialMetadataState::kPushed:
break;
case ServerInitialMetadataState::kPushing:
ops.push_back("server_initial_metadata:PUSHING");
break;
case ServerInitialMetadataState::kError:
ops.push_back("server_initial_metadata:ERROR");
break;
}
// Send message
std::string send_message_state = SendMessageString();
if (send_message_state != "WAITING") {
@ -864,11 +931,17 @@ class ClientStream : public ConnectedChannelStream {
}
bool requested_metadata_ = false;
bool need_to_clear_client_initial_metadata_outstanding_token_
ABSL_GUARDED_BY(mu()) = false;
bool client_initial_metadata_send_result_ ABSL_GUARDED_BY(mu());
ServerInitialMetadataState server_initial_metadata_state_
ABSL_GUARDED_BY(mu()) = ServerInitialMetadataState::kNotReceived;
bool queued_trailing_metadata_ ABSL_GUARDED_BY(mu()) = false;
Waker initial_metadata_waker_ ABSL_GUARDED_BY(mu());
Waker trailing_metadata_waker_ ABSL_GUARDED_BY(mu());
Waker recv_initial_metadata_waker_ ABSL_GUARDED_BY(mu());
Waker send_initial_metadata_waker_ ABSL_GUARDED_BY(mu());
Waker recv_trailing_metadata_waker_ ABSL_GUARDED_BY(mu());
ClientInitialMetadataOutstandingToken
client_initial_metadata_outstanding_token_;
PipeSender<ServerMetadataHandle>* server_initial_metadata_pipe_;
PipeReceiver<MessageHandle>* client_to_server_messages_;
PipeSender<MessageHandle>* server_to_client_messages_;
@ -884,9 +957,10 @@ class ClientStream : public ConnectedChannelStream {
ServerMetadataHandle server_trailing_metadata_;
absl::optional<PipeSender<ServerMetadataHandle>::PushType>
server_initial_metadata_push_promise_;
grpc_transport_stream_op_batch metadata_;
grpc_closure metadata_batch_done_ =
MakeMemberClosure<ClientStream, &ClientStream::MetadataBatchDone>(
grpc_transport_stream_op_batch send_metadata_;
grpc_transport_stream_op_batch recv_metadata_;
grpc_closure send_metadata_batch_done_ =
MakeMemberClosure<ClientStream, &ClientStream::SendMetadataBatchDone>(
this, DEBUG_LOCATION);
};
@ -948,6 +1022,7 @@ class ServerStream final : public ConnectedChannelStream {
gim.client_initial_metadata.get();
batch_payload()->recv_initial_metadata.recv_initial_metadata_ready =
&gim.recv_initial_metadata_ready;
IncrementRefCount("RecvInitialMetadata");
SchedulePush(&gim.recv_initial_metadata);
// Fetch trailing metadata (to catch cancellations)
@ -965,8 +1040,9 @@ class ServerStream final : public ConnectedChannelStream {
&GetContext<CallContext>()->call_stats()->transport_stream_stats;
batch_payload()->recv_trailing_metadata.recv_trailing_metadata_ready =
&gtm.recv_trailing_metadata_ready;
SchedulePush(&gtm.recv_trailing_metadata);
gtm.waker = Activity::current()->MakeOwningWaker();
IncrementRefCount("RecvTrailingMetadata");
SchedulePush(&gtm.recv_trailing_metadata);
}
Poll<ServerMetadataHandle> PollOnce() {
@ -995,6 +1071,7 @@ class ServerStream final : public ConnectedChannelStream {
.emplace<ServerMetadataHandle>(std::move(**md))
.get();
batch_payload()->send_initial_metadata.peer_string = nullptr;
IncrementRefCount("SendInitialMetadata");
SchedulePush(&send_initial_metadata_);
return true;
} else {
@ -1039,6 +1116,7 @@ class ServerStream final : public ConnectedChannelStream {
incoming_messages_ = &pipes_.client_to_server.sender;
auto promise = p->next_promise_factory(CallArgs{
std::move(p->client_initial_metadata),
ClientInitialMetadataOutstandingToken::Empty(),
&pipes_.server_initial_metadata.sender,
&pipes_.client_to_server.receiver, &pipes_.server_to_client.sender});
call_state_.emplace<MessageLoop>(
@ -1093,6 +1171,7 @@ class ServerStream final : public ConnectedChannelStream {
->as_string_view()),
StatusIntProperty::kRpcStatus, status_code);
}
IncrementRefCount("SendTrailingMetadata");
SchedulePush(&op);
}
}
@ -1188,6 +1267,7 @@ class ServerStream final : public ConnectedChannelStream {
std::move(getting.next_promise_factory)};
call_state_.emplace<GotInitialMetadata>(std::move(got));
waker.Wakeup();
Unref("RecvInitialMetadata");
}
void SendTrailingMetadataDone(absl::Status result) {
@ -1200,16 +1280,19 @@ class ServerStream final : public ConnectedChannelStream {
waker.ActivityDebugTag().c_str(), result.ToString().c_str(),
completing.sent ? "true" : "false", md->DebugString().c_str());
}
md->Set(GrpcStatusFromWire(), completing.sent);
if (!result.ok()) {
md->Clear();
md->Set(GrpcStatusMetadata(),
static_cast<grpc_status_code>(result.code()));
md->Set(GrpcMessageMetadata(), Slice::FromCopiedString(result.message()));
md->Set(GrpcStatusFromWire(), false);
md->Set(GrpcCallWasCancelled(), true);
}
if (!md->get(GrpcCallWasCancelled()).has_value()) {
md->Set(GrpcCallWasCancelled(), !completing.sent);
}
call_state_.emplace<Complete>(Complete{std::move(md)});
waker.Wakeup();
Unref("SendTrailingMetadata");
}
std::string ActiveOpsString() const override
@ -1261,7 +1344,7 @@ class ServerStream final : public ConnectedChannelStream {
return absl::StrJoin(ops, " ");
}
void SendInitialMetadataDone() {}
void SendInitialMetadataDone() { Unref("SendInitialMetadata"); }
void RecvTrailingMetadataReady(absl::Status error) {
MutexLock lock(mu());
@ -1285,6 +1368,7 @@ class ServerStream final : public ConnectedChannelStream {
client_trailing_metadata_state_.emplace<GotClientHalfClose>(
GotClientHalfClose{error});
waker.Wakeup();
Unref("RecvTrailingMetadata");
}
struct Pipes {

@ -16,6 +16,8 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include <inttypes.h>
#include <algorithm>
#include <initializer_list>
#include <memory>
@ -215,7 +217,7 @@ void BaseCallData::CapturedBatch::ResumeWith(Flusher* releaser) {
// refcnt==0 ==> cancelled
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_INFO, "%sRESUME BATCH REQUEST CANCELLED",
Activity::current()->DebugTag().c_str());
releaser->call()->DebugTag().c_str());
}
return;
}
@ -239,6 +241,10 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
auto* batch = std::exchange(batch_, nullptr);
GPR_ASSERT(batch != nullptr);
uintptr_t& refcnt = *RefCountField(batch);
gpr_log(GPR_DEBUG, "%sCancelWith: %p refs=%" PRIdPTR " err=%s [%s]",
releaser->call()->DebugTag().c_str(), batch, refcnt,
error.ToString().c_str(),
grpc_transport_stream_op_batch_string(batch).c_str());
if (refcnt == 0) {
// refcnt==0 ==> cancelled
if (grpc_trace_channel.enabled()) {
@ -325,6 +331,8 @@ const char* BaseCallData::SendMessage::StateString(State state) {
return "CANCELLED";
case State::kCancelledButNotYetPolled:
return "CANCELLED_BUT_NOT_YET_POLLED";
case State::kCancelledButNoStatus:
return "CANCELLED_BUT_NO_STATUS";
}
return "UNKNOWN";
}
@ -349,6 +357,7 @@ void BaseCallData::SendMessage::StartOp(CapturedBatch batch) {
Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_)));
case State::kCancelled:
case State::kCancelledButNotYetPolled:
case State::kCancelledButNoStatus:
return;
}
batch_ = batch;
@ -376,6 +385,7 @@ void BaseCallData::SendMessage::GotPipe(T* pipe_end) {
case State::kForwardedBatch:
case State::kBatchCompleted:
case State::kPushedToPipe:
case State::kCancelledButNoStatus:
Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_)));
case State::kCancelled:
case State::kCancelledButNotYetPolled:
@ -391,6 +401,7 @@ bool BaseCallData::SendMessage::IsIdle() const {
case State::kForwardedBatch:
case State::kCancelled:
case State::kCancelledButNotYetPolled:
case State::kCancelledButNoStatus:
return true;
case State::kGotBatchNoPipe:
case State::kGotBatch:
@ -419,6 +430,7 @@ void BaseCallData::SendMessage::OnComplete(absl::Status status) {
break;
case State::kCancelled:
case State::kCancelledButNotYetPolled:
case State::kCancelledButNoStatus:
flusher.AddClosure(intercepted_on_complete_, status,
"forward after cancel");
break;
@ -443,10 +455,14 @@ void BaseCallData::SendMessage::Done(const ServerMetadata& metadata,
case State::kCancelledButNotYetPolled:
break;
case State::kInitial:
state_ = State::kCancelled;
break;
case State::kIdle:
case State::kForwardedBatch:
state_ = State::kCancelledButNotYetPolled;
if (base_->is_current()) base_->ForceImmediateRepoll();
break;
case State::kCancelledButNoStatus:
case State::kGotBatchNoPipe:
case State::kGotBatch: {
std::string temp;
@ -465,6 +481,7 @@ void BaseCallData::SendMessage::Done(const ServerMetadata& metadata,
push_.reset();
next_.reset();
state_ = State::kCancelledButNotYetPolled;
if (base_->is_current()) base_->ForceImmediateRepoll();
break;
}
}
@ -483,6 +500,7 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
case State::kIdle:
case State::kGotBatchNoPipe:
case State::kCancelled:
case State::kCancelledButNoStatus:
break;
case State::kCancelledButNotYetPolled:
interceptor()->Push()->Close();
@ -524,13 +542,18 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
"result.has_value=%s",
base_->LogTag().c_str(), p->has_value() ? "true" : "false");
}
GPR_ASSERT(p->has_value());
batch_->payload->send_message.send_message->Swap((**p)->payload());
batch_->payload->send_message.flags = (**p)->flags();
state_ = State::kForwardedBatch;
batch_.ResumeWith(flusher);
next_.reset();
if (!absl::holds_alternative<Pending>((*push_)())) push_.reset();
if (p->has_value()) {
batch_->payload->send_message.send_message->Swap((**p)->payload());
batch_->payload->send_message.flags = (**p)->flags();
state_ = State::kForwardedBatch;
batch_.ResumeWith(flusher);
next_.reset();
if (!absl::holds_alternative<Pending>((*push_)())) push_.reset();
} else {
state_ = State::kCancelledButNoStatus;
next_.reset();
push_.reset();
}
}
} break;
case State::kForwardedBatch:
@ -1090,11 +1113,14 @@ class ClientCallData::PollContext {
// Poll the promise once since we're waiting for it.
Poll<ServerMetadataHandle> poll = self_->promise_();
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_INFO, "%s ClientCallData.PollContext.Run: poll=%s",
gpr_log(GPR_INFO, "%s ClientCallData.PollContext.Run: poll=%s; %s",
self_->LogTag().c_str(),
PollToString(poll, [](const ServerMetadataHandle& h) {
return h->DebugString();
}).c_str());
PollToString(poll,
[](const ServerMetadataHandle& h) {
return h->DebugString();
})
.c_str(),
self_->DebugString().c_str());
}
if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) {
auto md = std::move(*r);
@ -1274,7 +1300,11 @@ ClientCallData::ClientCallData(grpc_call_element* elem,
[args]() {
return args->arena->New<ReceiveInterceptor>(args->arena);
},
[args]() { return args->arena->New<SendInterceptor>(args->arena); }) {
[args]() { return args->arena->New<SendInterceptor>(args->arena); }),
initial_metadata_outstanding_token_(
(flags & kFilterIsLast) != 0
? ClientInitialMetadataOutstandingToken::New(arena())
: ClientInitialMetadataOutstandingToken::Empty()) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyCallback, this,
grpc_schedule_on_exec_ctx);
@ -1543,6 +1573,7 @@ void ClientCallData::StartPromise(Flusher* flusher) {
promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(send_initial_metadata_batch_->payload
->send_initial_metadata.send_initial_metadata),
std::move(initial_metadata_outstanding_token_),
server_initial_metadata_pipe() == nullptr
? nullptr
: &server_initial_metadata_pipe()->sender,
@ -1863,8 +1894,15 @@ struct ServerCallData::SendInitialMetadata {
class ServerCallData::PollContext {
public:
explicit PollContext(ServerCallData* self, Flusher* flusher)
: self_(self), flusher_(flusher) {
explicit PollContext(ServerCallData* self, Flusher* flusher,
DebugLocation created = DebugLocation())
: self_(self), flusher_(flusher), created_(created) {
if (self_->poll_ctx_ != nullptr) {
Crash(absl::StrCat(
"PollContext: disallowed recursion. New: ", created_.file(), ":",
created_.line(), "; Old: ", self_->poll_ctx_->created_.file(), ":",
self_->poll_ctx_->created_.line()));
}
GPR_ASSERT(self_->poll_ctx_ == nullptr);
self_->poll_ctx_ = this;
scoped_activity_.Init(self_);
@ -1910,6 +1948,7 @@ class ServerCallData::PollContext {
Flusher* const flusher_;
bool repoll_ = false;
bool have_scoped_activity_;
GPR_NO_UNIQUE_ADDRESS DebugLocation created_;
};
const char* ServerCallData::StateString(RecvInitialState state) {
@ -2079,7 +2118,10 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
switch (send_trailing_state_) {
case SendTrailingState::kInitial:
send_trailing_metadata_batch_ = batch;
if (receive_message() != nullptr) {
if (receive_message() != nullptr &&
batch->payload->send_trailing_metadata.send_trailing_metadata
->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
receive_message()->Done(
*batch->payload->send_trailing_metadata.send_trailing_metadata,
&flusher);
@ -2138,6 +2180,7 @@ void ServerCallData::Completed(grpc_error_handle error, Flusher* flusher) {
if (!error.ok()) {
auto* batch = grpc_make_transport_stream_op(
NewClosure([call_combiner = call_combiner()](absl::Status) {
gpr_log(GPR_DEBUG, "ON COMPLETE DONE");
GRPC_CALL_COMBINER_STOP(call_combiner, "done-cancel");
}));
batch->cancel_stream = true;
@ -2312,6 +2355,7 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
FakeActivity(this).Run([this, filter] {
promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(recv_initial_metadata_),
ClientInitialMetadataOutstandingToken::Empty(),
server_initial_metadata_pipe() == nullptr
? nullptr
: &server_initial_metadata_pipe()->sender,
@ -2413,9 +2457,14 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
(send_trailing_metadata_batch_->send_message &&
send_message()->IsForwarded()))) {
send_trailing_state_ = SendTrailingState::kQueued;
send_message()->Done(*send_trailing_metadata_batch_->payload
->send_trailing_metadata.send_trailing_metadata,
flusher);
if (send_trailing_metadata_batch_->payload->send_trailing_metadata
.send_trailing_metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
send_message()->Done(
*send_trailing_metadata_batch_->payload->send_trailing_metadata
.send_trailing_metadata,
flusher);
}
}
}
if (receive_message() != nullptr) {

@ -218,7 +218,11 @@ class BaseCallData : public Activity, private Wakeable {
void Resume(grpc_transport_stream_op_batch* batch) {
GPR_ASSERT(!call_->is_last());
release_.push_back(batch);
if (batch->HasOp()) {
release_.push_back(batch);
} else if (batch->on_complete != nullptr) {
Complete(batch);
}
}
void Cancel(grpc_transport_stream_op_batch* batch,
@ -237,6 +241,8 @@ class BaseCallData : public Activity, private Wakeable {
call_closures_.Add(closure, error, reason);
}
BaseCallData* call() const { return call_; }
private:
absl::InlinedVector<grpc_transport_stream_op_batch*, 1> release_;
CallCombinerClosureList call_closures_;
@ -398,6 +404,8 @@ class BaseCallData : public Activity, private Wakeable {
kCancelledButNotYetPolled,
// We're done.
kCancelled,
// We're done, but we haven't gotten a status yet
kCancelledButNoStatus,
};
static const char* StateString(State);
@ -664,6 +672,8 @@ class ClientCallData : public BaseCallData {
RecvTrailingState recv_trailing_state_ = RecvTrailingState::kInitial;
// Polling related data. Non-null if we're actively polling
PollContext* poll_ctx_ = nullptr;
// Initial metadata outstanding token
ClientInitialMetadataOutstandingToken initial_metadata_outstanding_token_;
};
class ServerCallData : public BaseCallData {

@ -171,8 +171,8 @@ class CallCombinerClosureList {
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
gpr_log(GPR_INFO,
"CallCombinerClosureList executing closure while already "
"holding call_combiner %p: closure=%p error=%s reason=%s",
call_combiner, closures_[0].closure,
"holding call_combiner %p: closure=%s error=%s reason=%s",
call_combiner, closures_[0].closure->DebugString().c_str(),
StatusToString(closures_[0].error).c_str(), closures_[0].reason);
}
// This will release the call combiner.

@ -17,6 +17,7 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <type_traits>
#include <utility>
@ -106,6 +107,9 @@ class Curried {
private:
GPR_NO_UNIQUE_ADDRESS F f_;
GPR_NO_UNIQUE_ADDRESS Arg arg_;
#ifndef NDEBUG
std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
#endif
};
// Promote a callable(A) -> T | Poll<T> to a PromiseFactory(A) -> Promise<T> by

@ -140,7 +140,8 @@ class InterceptorList {
async_resolution_.space.get());
async_resolution_.current_factory =
async_resolution_.current_factory->next();
if (async_resolution_.current_factory == nullptr || !p->has_value()) {
if (!p->has_value()) async_resolution_.current_factory = nullptr;
if (async_resolution_.current_factory == nullptr) {
return std::move(*p);
}
async_resolution_.current_factory->MakePromise(

@ -89,6 +89,8 @@ class Latch {
waiter_.Wake();
}
bool is_set() const { return has_value_; }
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x",
@ -165,7 +167,7 @@ class Latch<void> {
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x",
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}

@ -18,12 +18,12 @@
#include <grpc/support/port_platform.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include "absl/status/status.h"
@ -41,6 +41,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -57,6 +58,7 @@
#include "src/core/lib/security/transport/auth_filters.h" // IWYU pragma: keep
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
@ -120,12 +122,28 @@ class ServerAuthFilter::RunApplicationCode {
// memory later
RunApplicationCode(ServerAuthFilter* filter, CallArgs call_args)
: state_(GetContext<Arena>()->ManagedNew<State>(std::move(call_args))) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_ERROR,
"%s[server-auth]: Delegate to application: filter=%p this=%p "
"auth_ctx=%p",
Activity::current()->DebugTag().c_str(), filter, this,
filter->auth_context_.get());
}
filter->server_credentials_->auth_metadata_processor().process(
filter->server_credentials_->auth_metadata_processor().state,
filter->auth_context_.get(), state_->md.metadata, state_->md.count,
OnMdProcessingDone, state_);
}
RunApplicationCode(const RunApplicationCode&) = delete;
RunApplicationCode& operator=(const RunApplicationCode&) = delete;
RunApplicationCode(RunApplicationCode&& other) noexcept
: state_(std::exchange(other.state_, nullptr)) {}
RunApplicationCode& operator=(RunApplicationCode&& other) noexcept {
state_ = std::exchange(other.state_, nullptr);
return *this;
}
Poll<absl::StatusOr<CallArgs>> operator()() {
if (state_->done.load(std::memory_order_acquire)) {
return Poll<absl::StatusOr<CallArgs>>(std::move(state_->call_args));

@ -480,7 +480,7 @@ int grpc_slice_slice(grpc_slice haystack, grpc_slice needle) {
}
const uint8_t* last = haystack_bytes + haystack_len - needle_len;
for (const uint8_t* cur = haystack_bytes; cur != last; ++cur) {
for (const uint8_t* cur = haystack_bytes; cur <= last; ++cur) {
if (0 == memcmp(cur, needle_bytes, needle_len)) {
return static_cast<int>(cur - haystack_bytes);
}

File diff suppressed because it is too large Load Diff

@ -79,6 +79,7 @@ ArenaPromise<ServerMetadataHandle> LameClientFilter::MakeCallPromise(
if (args.server_to_client_messages != nullptr) {
args.server_to_client_messages->Close();
}
args.client_initial_metadata_outstanding.Complete(true);
return Immediate(ServerMetadataFromStatus(error_));
}

@ -396,6 +396,15 @@ struct GrpcStatusFromWire {
static absl::string_view DisplayValue(bool x) { return x ? "true" : "false"; }
};
// Annotation to denote that this call qualifies for cancelled=1 for the
// RECV_CLOSE_ON_SERVER op
struct GrpcCallWasCancelled {
static absl::string_view DebugKey() { return "GrpcCallWasCancelled"; }
static constexpr bool kRepeatable = false;
using ValueType = bool;
static absl::string_view DisplayValue(bool x) { return x ? "true" : "false"; }
};
// Annotation added by client surface code to denote wait-for-ready state
struct WaitForReady {
struct ValueType {
@ -1327,7 +1336,8 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
// Non-encodable things
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,
grpc_core::WaitForReady, grpc_core::GrpcTrailersOnly>;
grpc_core::GrpcCallWasCancelled, grpc_core::WaitForReady,
grpc_core::GrpcTrailersOnly>;
struct grpc_metadata_batch : public grpc_metadata_batch_base {
using grpc_metadata_batch_base::grpc_metadata_batch_base;

@ -27,6 +27,7 @@
#include <functional>
#include <string>
#include <type_traits>
#include <utility>
#include "absl/status/status.h"
@ -54,6 +55,7 @@
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -144,11 +146,70 @@ struct StatusCastImpl<ServerMetadataHandle, absl::Status&> {
}
};
// Move only type that tracks call startup.
// Allows observation of when client_initial_metadata has been processed by the
// end of the local call stack.
// Interested observers can call Wait() to obtain a promise that will resolve
// when all local client_initial_metadata processing has completed.
// The result of this token is either true on successful completion, or false
// if the metadata was not sent.
// To set a successful completion, call Complete(true). For failure, call
// Complete(false).
// If Complete is not called, the destructor of a still held token will complete
// with failure.
// Transports should hold this token until client_initial_metadata has passed
// any flow control (eg MAX_CONCURRENT_STREAMS for http2).
class ClientInitialMetadataOutstandingToken {
public:
static ClientInitialMetadataOutstandingToken Empty() {
return ClientInitialMetadataOutstandingToken();
}
static ClientInitialMetadataOutstandingToken New(
Arena* arena = GetContext<Arena>()) {
ClientInitialMetadataOutstandingToken token;
token.latch_ = arena->New<Latch<bool>>();
return token;
}
ClientInitialMetadataOutstandingToken(
const ClientInitialMetadataOutstandingToken&) = delete;
ClientInitialMetadataOutstandingToken& operator=(
const ClientInitialMetadataOutstandingToken&) = delete;
ClientInitialMetadataOutstandingToken(
ClientInitialMetadataOutstandingToken&& other) noexcept
: latch_(std::exchange(other.latch_, nullptr)) {}
ClientInitialMetadataOutstandingToken& operator=(
ClientInitialMetadataOutstandingToken&& other) noexcept {
latch_ = std::exchange(other.latch_, nullptr);
return *this;
}
~ClientInitialMetadataOutstandingToken() {
if (latch_ != nullptr) latch_->Set(false);
}
void Complete(bool success) { std::exchange(latch_, nullptr)->Set(success); }
// Returns a promise that will resolve when this object (or its moved-from
// ancestor) is dropped.
auto Wait() { return latch_->Wait(); }
private:
ClientInitialMetadataOutstandingToken() = default;
Latch<bool>* latch_ = nullptr;
};
using ClientInitialMetadataOutstandingTokenWaitType =
decltype(std::declval<ClientInitialMetadataOutstandingToken>().Wait());
struct CallArgs {
// Initial metadata from the client to the server.
// During promise setup this can be manipulated by filters (and then
// passed on to the next filter).
ClientMetadataHandle client_initial_metadata;
// Token indicating that client_initial_metadata is still being processed.
// This should be moved around and only destroyed when the transport is
// satisfied that the metadata has passed any flow control measures it has.
ClientInitialMetadataOutstandingToken client_initial_metadata_outstanding;
// Initial metadata from the server to the client.
// Set once when it's available.
// During promise setup filters can substitute their own latch for this
@ -331,6 +392,12 @@ struct grpc_transport_stream_op_batch {
/// Is this stream traced
bool is_traced : 1;
bool HasOp() const {
return send_initial_metadata || send_trailing_metadata || send_message ||
recv_initial_metadata || recv_message || recv_trailing_metadata ||
cancel_stream;
}
//**************************************************************************
// remaining fields are initialized and used at the discretion of the
// current handler of the op

@ -45,8 +45,6 @@
#define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key"
static const char oauth2_md[] = "Bearer aaslkfjs424535asdf";
static const char* client_identity_property_name = "smurf_name";
static const char* client_identity = "Brainy Smurf";
struct fullstack_secure_fixture_data {
std::string localaddr;
@ -70,7 +68,7 @@ typedef struct {
size_t pseudo_refcount;
} test_processor_state;
static void process_oauth2_success(void* state, grpc_auth_context* ctx,
static void process_oauth2_success(void* state, grpc_auth_context*,
const grpc_metadata* md, size_t md_count,
grpc_process_auth_metadata_done_cb cb,
void* user_data) {
@ -82,10 +80,6 @@ static void process_oauth2_success(void* state, grpc_auth_context* ctx,
s = static_cast<test_processor_state*>(state);
GPR_ASSERT(s->pseudo_refcount == 1);
GPR_ASSERT(oauth2 != nullptr);
grpc_auth_context_add_cstring_property(ctx, client_identity_property_name,
client_identity);
GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
ctx, client_identity_property_name) == 1);
cb(user_data, oauth2, 1, nullptr, 0, GRPC_STATUS_OK, nullptr);
}

@ -45,8 +45,6 @@
#define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key"
static const char oauth2_md[] = "Bearer aaslkfjs424535asdf";
static const char* client_identity_property_name = "smurf_name";
static const char* client_identity = "Brainy Smurf";
struct fullstack_secure_fixture_data {
std::string localaddr;
@ -70,7 +68,7 @@ typedef struct {
size_t pseudo_refcount;
} test_processor_state;
static void process_oauth2_success(void* state, grpc_auth_context* ctx,
static void process_oauth2_success(void* state, grpc_auth_context*,
const grpc_metadata* md, size_t md_count,
grpc_process_auth_metadata_done_cb cb,
void* user_data) {
@ -82,10 +80,6 @@ static void process_oauth2_success(void* state, grpc_auth_context* ctx,
s = static_cast<test_processor_state*>(state);
GPR_ASSERT(s->pseudo_refcount == 1);
GPR_ASSERT(oauth2 != nullptr);
grpc_auth_context_add_cstring_property(ctx, client_identity_property_name,
client_identity);
GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
ctx, client_identity_property_name) == 1);
cb(user_data, oauth2, 1, nullptr, 0, GRPC_STATUS_OK, nullptr);
}

@ -210,7 +210,7 @@ static void on_p2s_sent_message(void* arg, int success) {
grpc_op op;
grpc_call_error err;
grpc_byte_buffer_destroy(pc->c2p_msg);
grpc_byte_buffer_destroy(std::exchange(pc->c2p_msg, nullptr));
if (!pc->proxy->shutdown && success) {
op.op = GRPC_OP_RECV_MESSAGE;
op.flags = 0;

@ -21,6 +21,7 @@
#include <string.h>
#include <algorithm>
#include <memory>
#include <vector>
#include "absl/status/status.h"
@ -40,7 +41,10 @@
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/util/test_config.h"
@ -448,12 +452,23 @@ static grpc_error_handle init_channel_elem(
static void destroy_channel_elem(grpc_channel_element* /*elem*/) {}
static const grpc_channel_filter test_filter = {
grpc_call_next_op, nullptr,
grpc_channel_next_op, 0,
init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem, 0,
init_channel_elem, grpc_channel_stack_no_post_init,
destroy_channel_elem, grpc_channel_next_get_info,
grpc_call_next_op,
[](grpc_channel_element*, grpc_core::CallArgs,
grpc_core::NextPromiseFactory)
-> grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> {
return grpc_core::Immediate(grpc_core::ServerMetadataFromStatus(
absl::PermissionDeniedError("access denied")));
},
grpc_channel_next_op,
0,
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0,
init_channel_elem,
grpc_channel_stack_no_post_init,
destroy_channel_elem,
grpc_channel_next_get_info,
"filter_init_fails"};
//******************************************************************************

@ -128,6 +128,9 @@ static void test_max_message_length_on_request(grpc_end2end_test_config config,
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
grpc_slice expect_in_details = grpc_slice_from_copied_string(
send_limit ? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)");
int was_cancelled = 2;
grpc_channel_args* client_args = nullptr;
@ -266,13 +269,10 @@ static void test_max_message_length_on_request(grpc_end2end_test_config config,
done:
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(
grpc_slice_str_cmp(
details, send_limit
? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)") == 0);
GPR_ASSERT(grpc_slice_slice(details, expect_in_details) >= 0);
grpc_slice_unref(details);
grpc_slice_unref(expect_in_details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
@ -316,6 +316,9 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config,
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
grpc_slice expect_in_details = grpc_slice_from_copied_string(
send_limit ? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)");
int was_cancelled = 2;
grpc_channel_args* client_args = nullptr;
@ -455,13 +458,10 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config,
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(
grpc_slice_str_cmp(
details, send_limit
? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)") == 0);
GPR_ASSERT(grpc_slice_slice(details, expect_in_details) >= 0);
grpc_slice_unref(details);
grpc_slice_unref(expect_in_details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);

@ -40,10 +40,14 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args,
bool request_status_early) {
bool request_status_early,
bool recv_message_separately) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s/request_status_early=%s", test_name,
config.name, request_status_early ? "true" : "false");
gpr_log(
GPR_INFO,
"Running test: %s/%s/request_status_early=%s/recv_message_separately=%s",
test_name, config.name, request_status_early ? "true" : "false",
recv_message_separately ? "true" : "false");
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
@ -108,7 +112,7 @@ static void test(grpc_end2end_test_config config, bool request_status_early,
grpc_raw_byte_buffer_create(&response_payload2_slice, 1);
grpc_end2end_test_fixture f =
begin_test(config, "streaming_error_response", nullptr, nullptr,
request_status_early);
request_status_early, recv_message_separately);
grpc_core::CqVerifier cqv(f.cq);
grpc_op ops[6];
grpc_op* op;

@ -155,7 +155,8 @@ TEST_F(ClientAuthFilterTest, CallCredsFails) {
auto promise = filter->MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch_,
Arena::PooledDeleter(nullptr)),
nullptr, nullptr, nullptr},
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr,
nullptr},
[&](CallArgs /*call_args*/) {
return ArenaPromise<ServerMetadataHandle>(
[&]() -> Poll<ServerMetadataHandle> {
@ -185,7 +186,8 @@ TEST_F(ClientAuthFilterTest, RewritesInvalidStatusFromCallCreds) {
auto promise = filter->MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch_,
Arena::PooledDeleter(nullptr)),
nullptr, nullptr, nullptr},
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr,
nullptr},
[&](CallArgs /*call_args*/) {
return ArenaPromise<ServerMetadataHandle>(
[&]() -> Poll<ServerMetadataHandle> {

@ -72,7 +72,8 @@ TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) {
auto promise = filter.MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch,
Arena::PooledDeleter(nullptr)),
nullptr, nullptr, nullptr},
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr,
nullptr},
[&](CallArgs call_args) {
EXPECT_EQ(call_args.client_initial_metadata
->get_pointer(HttpAuthorityMetadata())
@ -107,7 +108,8 @@ TEST(ClientAuthorityFilterTest,
auto promise = filter.MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch,
Arena::PooledDeleter(nullptr)),
nullptr, nullptr, nullptr},
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr,
nullptr},
[&](CallArgs call_args) {
EXPECT_EQ(call_args.client_initial_metadata
->get_pointer(HttpAuthorityMetadata())

@ -477,6 +477,7 @@ class MainLoop {
auto* server_initial_metadata = arena_->New<Pipe<ServerMetadataHandle>>();
CallArgs call_args{std::move(*LoadMetadata(client_initial_metadata,
&client_initial_metadata_)),
ClientInitialMetadataOutstandingToken::Empty(),
&server_initial_metadata->sender, nullptr, nullptr};
if (is_client) {
promise_ = main_loop_->channel_stack_->MakeClientCallPromise(

Loading…
Cancel
Save