Revert "[promises] Rollforward: Finish of server side calls (#32347)" (#32394)

There were some rollback conflicts, so this isn't a pure rollback.

This reverts commit ba0e55f539.




<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32341/head^2
Craig Tiller 2 years ago committed by GitHub
parent 4777db3003
commit 241e8ed417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 2
      build_autogenerated.yaml
  3. 15
      src/core/BUILD
  4. 6
      src/core/ext/filters/http/client/http_client_filter.cc
  5. 13
      src/core/ext/filters/http/message_compress/compression_filter.cc
  6. 365
      src/core/ext/filters/message_size/message_size_filter.cc
  7. 51
      src/core/ext/filters/message_size/message_size_filter.h
  8. 2
      src/core/ext/transport/inproc/inproc_transport.cc
  9. 166
      src/core/lib/channel/connected_channel.cc
  10. 91
      src/core/lib/channel/promise_based_filter.cc
  11. 12
      src/core/lib/channel/promise_based_filter.h
  12. 4
      src/core/lib/iomgr/call_combiner.h
  13. 4
      src/core/lib/promise/detail/promise_factory.h
  14. 3
      src/core/lib/promise/interceptor_list.h
  15. 4
      src/core/lib/promise/latch.h
  16. 22
      src/core/lib/security/transport/server_auth_filter.cc
  17. 2
      src/core/lib/slice/slice.cc
  18. 679
      src/core/lib/surface/call.cc
  19. 1
      src/core/lib/surface/lame_client.cc
  20. 12
      src/core/lib/transport/metadata_batch.h
  21. 67
      src/core/lib/transport/transport.h
  22. 8
      test/core/end2end/fixtures/h2_oauth2_tls12.cc
  23. 8
      test/core/end2end/fixtures/h2_oauth2_tls13.cc
  24. 2
      test/core/end2end/fixtures/proxy.cc
  25. 27
      test/core/end2end/tests/filter_init_fails.cc
  26. 20
      test/core/end2end/tests/max_message_length.cc
  27. 12
      test/core/end2end/tests/streaming_error_response.cc
  28. 6
      test/core/filters/client_auth_filter_test.cc
  29. 6
      test/core/filters/client_authority_filter_test.cc
  30. 1
      test/core/filters/filter_fuzzer.cc

@ -1487,7 +1487,6 @@ grpc_cc_library(
"//src/core:iomgr_fwd", "//src/core:iomgr_fwd",
"//src/core:iomgr_port", "//src/core:iomgr_port",
"//src/core:json", "//src/core:json",
"//src/core:latch",
"//src/core:map", "//src/core:map",
"//src/core:match", "//src/core:match",
"//src/core:memory_quota", "//src/core:memory_quota",

@ -3730,7 +3730,6 @@ libs:
- src/core/lib/promise/if.h - src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h - src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/intra_activity_waiter.h - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h - src/core/lib/promise/loop.h
- src/core/lib/promise/map.h - src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h - src/core/lib/promise/pipe.h
@ -7544,7 +7543,6 @@ targets:
- src/core/lib/promise/if.h - src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h - src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/intra_activity_waiter.h - src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h - src/core/lib/promise/loop.h
- src/core/lib/promise/map.h - src/core/lib/promise/map.h
- src/core/lib/promise/pipe.h - src/core/lib/promise/pipe.h

@ -3460,38 +3460,33 @@ grpc_cc_library(
"ext/filters/message_size/message_size_filter.h", "ext/filters/message_size/message_size_filter.h",
], ],
external_deps = [ external_deps = [
"absl/status:statusor", "absl/status",
"absl/strings", "absl/strings",
"absl/strings:str_format", "absl/strings:str_format",
"absl/types:optional", "absl/types:optional",
], ],
language = "c++", language = "c++",
deps = [ deps = [
"activity",
"arena",
"arena_promise",
"channel_args", "channel_args",
"channel_fwd", "channel_fwd",
"channel_init", "channel_init",
"channel_stack_type", "channel_stack_type",
"context", "closure",
"error",
"grpc_service_config", "grpc_service_config",
"json", "json",
"json_args", "json_args",
"json_object_loader", "json_object_loader",
"latch",
"poll",
"race",
"service_config_parser", "service_config_parser",
"slice",
"slice_buffer", "slice_buffer",
"status_helper",
"validation_errors", "validation_errors",
"//:channel_stack_builder", "//:channel_stack_builder",
"//:config", "//:config",
"//:debug_location",
"//:gpr", "//:gpr",
"//:grpc_base", "//:grpc_base",
"//:grpc_public_hdrs", "//:grpc_public_hdrs",
"//:grpc_trace",
], ],
) )

@ -133,13 +133,13 @@ ArenaPromise<ServerMetadataHandle> HttpClientFilter::MakeCallPromise(
return std::move(md); return std::move(md);
}); });
return Race(initial_metadata_err->Wait(), return Race(Map(next_promise_factory(std::move(call_args)),
Map(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle { [](ServerMetadataHandle md) -> ServerMetadataHandle {
auto r = CheckServerMetadata(md.get()); auto r = CheckServerMetadata(md.get());
if (!r.ok()) return ServerMetadataFromStatus(r); if (!r.ok()) return ServerMetadataFromStatus(r);
return md; return md;
})); }),
initial_metadata_err->Wait());
} }
HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme, HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme,

@ -233,7 +233,7 @@ ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
return CompressMessage(std::move(message), compression_algorithm); return CompressMessage(std::move(message), compression_algorithm);
}); });
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>( auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>(
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt}); DecompressArgs{GRPC_COMPRESS_NONE, absl::nullopt});
auto* decompress_err = auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>(); GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap( call_args.server_initial_metadata->InterceptAndMap(
@ -254,8 +254,8 @@ ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
return std::move(*r); return std::move(*r);
}); });
// Run the next filter, and race it with getting an error from decompression. // Run the next filter, and race it with getting an error from decompression.
return Race(decompress_err->Wait(), return Race(next_promise_factory(std::move(call_args)),
next_promise_factory(std::move(call_args))); decompress_err->Wait());
} }
ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise( ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
@ -269,8 +269,7 @@ ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
this](MessageHandle message) -> absl::optional<MessageHandle> { this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args); auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s", gpr_log(GPR_DEBUG, "DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str()); r.status().ToString().c_str());
} }
if (!r.ok()) { if (!r.ok()) {
@ -301,8 +300,8 @@ ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
// - decompress incoming messages // - decompress incoming messages
// - wait for initial metadata to be sent, and then commence compression of // - wait for initial metadata to be sent, and then commence compression of
// outgoing messages // outgoing messages
return Race(decompress_err->Wait(), return Race(next_promise_factory(std::move(call_args)),
next_promise_factory(std::move(call_args))); decompress_err->Wait());
} }
} // namespace grpc_core } // namespace grpc_core

@ -18,13 +18,10 @@
#include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/ext/filters/message_size/message_size_filter.h"
#include <inttypes.h>
#include <functional>
#include <initializer_list> #include <initializer_list>
#include <string> #include <new>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/str_format.h" #include "absl/strings/str_format.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
@ -35,22 +32,21 @@
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/promise/context.h" #include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/promise/latch.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/promise/poll.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config_call_data.h" #include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
static void recv_message_ready(void* user_data, grpc_error_handle error);
static void recv_trailing_metadata_ready(void* user_data,
grpc_error_handle error);
namespace grpc_core { namespace grpc_core {
// //
@ -128,164 +124,251 @@ size_t MessageSizeParser::ParserIndex() {
parser_name()); parser_name());
} }
// } // namespace grpc_core
// MessageSizeFilter
//
const grpc_channel_filter ClientMessageSizeFilter::kFilter = namespace {
MakePromiseBasedFilter<ClientMessageSizeFilter, FilterEndpoint::kClient, struct channel_data {
kFilterExaminesOutboundMessages | grpc_core::MessageSizeParsedConfig limits;
kFilterExaminesInboundMessages>("message_size"); const size_t service_config_parser_index{
const grpc_channel_filter ServerMessageSizeFilter::kFilter = grpc_core::MessageSizeParser::ParserIndex()};
MakePromiseBasedFilter<ServerMessageSizeFilter, FilterEndpoint::kServer, };
kFilterExaminesOutboundMessages |
kFilterExaminesInboundMessages>("message_size"); struct call_data {
call_data(grpc_call_element* elem, const channel_data& chand,
class MessageSizeFilter::CallBuilder { const grpc_call_element_args& args)
private: : call_combiner(args.call_combiner), limits(chand.limits) {
auto Interceptor(uint32_t max_length, bool is_send) { GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem,
return [max_length, is_send, grpc_schedule_on_exec_ctx);
err = err_](MessageHandle msg) -> absl::optional<MessageHandle> { GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
if (grpc_call_trace.enabled()) { ::recv_trailing_metadata_ready, elem,
gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d", grpc_schedule_on_exec_ctx);
Activity::current()->DebugTag().c_str(), // Get max sizes from channel data, then merge in per-method config values.
is_send ? "send" : "recv", msg->payload()->Length(), // Note: Per-method config is only available on the client, so we
max_length); // apply the max request size to the send limit and the max response
// size to the receive limit.
const grpc_core::MessageSizeParsedConfig* config_from_call_context =
grpc_core::MessageSizeParsedConfig::GetFromCallContext(
args.context, chand.service_config_parser_index);
if (config_from_call_context != nullptr) {
absl::optional<uint32_t> max_send_size = limits.max_send_size();
absl::optional<uint32_t> max_recv_size = limits.max_recv_size();
if (config_from_call_context->max_send_size().has_value() &&
(!max_send_size.has_value() ||
*config_from_call_context->max_send_size() < *max_send_size)) {
max_send_size = *config_from_call_context->max_send_size();
} }
if (msg->payload()->Length() > max_length) { if (config_from_call_context->max_recv_size().has_value() &&
if (err->is_set()) return std::move(msg); (!max_recv_size.has_value() ||
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>( *config_from_call_context->max_recv_size() < *max_recv_size)) {
GetContext<Arena>()); max_recv_size = *config_from_call_context->max_recv_size();
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(
absl::StrFormat("%s message larger than max (%u vs. %d)",
is_send ? "Sent" : "Received",
msg->payload()->Length(), max_length)));
err->Set(std::move(r));
return absl::nullopt;
} }
return std::move(msg); limits = grpc_core::MessageSizeParsedConfig(max_send_size, max_recv_size);
}; }
} }
public: ~call_data() {}
explicit CallBuilder(const MessageSizeParsedConfig& limits)
: limits_(limits) {} grpc_core::CallCombiner* call_combiner;
grpc_core::MessageSizeParsedConfig limits;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
grpc_closure recv_trailing_metadata_ready;
// The error caused by a message that is too large, or absl::OkStatus()
grpc_error_handle error;
// Used by recv_message_ready.
absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready = nullptr;
// Original recv_trailing_metadata callback, invoked after our own.
grpc_closure* original_recv_trailing_metadata_ready;
bool seen_recv_trailing_metadata = false;
grpc_error_handle recv_trailing_metadata_error;
};
} // namespace
template <typename T> // Callback invoked when we receive a message. Here we check the max
void AddSend(T* pipe_end) { // receive message size.
if (!limits_.max_send_size().has_value()) return; static void recv_message_ready(void* user_data, grpc_error_handle error) {
pipe_end->InterceptAndMap(Interceptor(*limits_.max_send_size(), true)); grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->recv_message->has_value() &&
calld->limits.max_recv_size().has_value() &&
(*calld->recv_message)->Length() >
static_cast<size_t>(*calld->limits.max_recv_size())) {
grpc_error_handle new_error = grpc_error_set_int(
GRPC_ERROR_CREATE(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->Length(), *calld->limits.max_recv_size())),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_RESOURCE_EXHAUSTED);
error = grpc_error_add_child(error, new_error);
calld->error = error;
} }
template <typename T> // Invoke the next callback.
void AddRecv(T* pipe_end) { grpc_closure* closure = calld->next_recv_message_ready;
if (!limits_.max_recv_size().has_value()) return; calld->next_recv_message_ready = nullptr;
pipe_end->InterceptAndMap(Interceptor(*limits_.max_recv_size(), false)); if (calld->seen_recv_trailing_metadata) {
// We might potentially see another RECV_MESSAGE op. In that case, we do not
// want to run the recv_trailing_metadata_ready closure again. The newer
// RECV_MESSAGE op cannot cause any errors since the transport has already
// invoked the recv_trailing_metadata_ready closure and all further
// RECV_MESSAGE ops will get null payloads.
calld->seen_recv_trailing_metadata = false;
GRPC_CALL_COMBINER_START(calld->call_combiner,
&calld->recv_trailing_metadata_ready,
calld->recv_trailing_metadata_error,
"continue recv_trailing_metadata_ready");
} }
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
}
ArenaPromise<ServerMetadataHandle> Run( // Callback invoked on completion of recv_trailing_metadata
CallArgs call_args, NextPromiseFactory next_promise_factory) { // Notifies the recv_trailing_metadata batch of any message size failures
return Race(err_->Wait(), next_promise_factory(std::move(call_args))); static void recv_trailing_metadata_ready(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->next_recv_message_ready != nullptr) {
calld->seen_recv_trailing_metadata = true;
calld->recv_trailing_metadata_error = error;
GRPC_CALL_COMBINER_STOP(calld->call_combiner,
"deferring recv_trailing_metadata_ready until "
"after recv_message_ready");
return;
} }
error = grpc_error_add_child(error, calld->error);
// Invoke the next callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, error);
}
private: // Start transport stream op.
Latch<ServerMetadataHandle>* const err_ = static void message_size_start_transport_stream_op_batch(
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>(); grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
MessageSizeParsedConfig limits_; call_data* calld = static_cast<call_data*>(elem->call_data);
}; // Check max send message size.
if (op->send_message && calld->limits.max_send_size().has_value() &&
absl::StatusOr<ClientMessageSizeFilter> ClientMessageSizeFilter::Create( op->payload->send_message.send_message->Length() >
const ChannelArgs& args, ChannelFilter::Args) { static_cast<size_t>(*calld->limits.max_send_size())) {
return ClientMessageSizeFilter(args); grpc_transport_stream_op_batch_finish_with_failure(
op,
grpc_error_set_int(GRPC_ERROR_CREATE(absl::StrFormat(
"Sent message larger than max (%u vs. %d)",
op->payload->send_message.send_message->Length(),
*calld->limits.max_send_size())),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_RESOURCE_EXHAUSTED),
calld->call_combiner);
return;
}
// Inject callback for receiving a message.
if (op->recv_message) {
calld->next_recv_message_ready =
op->payload->recv_message.recv_message_ready;
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
// Inject callback for receiving trailing metadata.
if (op->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
// Chain to the next filter.
grpc_call_next_op(elem, op);
} }
absl::StatusOr<ServerMessageSizeFilter> ServerMessageSizeFilter::Create( // Constructor for call_data.
const ChannelArgs& args, ChannelFilter::Args) { static grpc_error_handle message_size_init_call_elem(
return ServerMessageSizeFilter(args); grpc_call_element* elem, const grpc_call_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (elem->call_data) call_data(elem, *chand, *args);
return absl::OkStatus();
} }
ArenaPromise<ServerMetadataHandle> ClientMessageSizeFilter::MakeCallPromise( // Destructor for call_data.
CallArgs call_args, NextPromiseFactory next_promise_factory) { static void message_size_destroy_call_elem(
// Get max sizes from channel data, then merge in per-method config values. grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
// Note: Per-method config is only available on the client, so we grpc_closure* /*ignored*/) {
// apply the max request size to the send limit and the max response call_data* calld = static_cast<call_data*>(elem->call_data);
// size to the receive limit. calld->~call_data();
MessageSizeParsedConfig limits = this->limits(); }
const MessageSizeParsedConfig* config_from_call_context =
MessageSizeParsedConfig::GetFromCallContext(
GetContext<grpc_call_context_element>(),
service_config_parser_index_);
if (config_from_call_context != nullptr) {
absl::optional<uint32_t> max_send_size = limits.max_send_size();
absl::optional<uint32_t> max_recv_size = limits.max_recv_size();
if (config_from_call_context->max_send_size().has_value() &&
(!max_send_size.has_value() ||
*config_from_call_context->max_send_size() < *max_send_size)) {
max_send_size = *config_from_call_context->max_send_size();
}
if (config_from_call_context->max_recv_size().has_value() &&
(!max_recv_size.has_value() ||
*config_from_call_context->max_recv_size() < *max_recv_size)) {
max_recv_size = *config_from_call_context->max_recv_size();
}
limits = MessageSizeParsedConfig(max_send_size, max_recv_size);
}
CallBuilder b(limits); // Constructor for channel_data.
b.AddSend(call_args.client_to_server_messages); static grpc_error_handle message_size_init_channel_elem(
b.AddRecv(call_args.server_to_client_messages); grpc_channel_element* elem, grpc_channel_element_args* args) {
return b.Run(std::move(call_args), std::move(next_promise_factory)); GPR_ASSERT(!args->is_last);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
chand->limits = grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(
args->channel_args);
return absl::OkStatus();
} }
ArenaPromise<ServerMetadataHandle> ServerMessageSizeFilter::MakeCallPromise( // Destructor for channel_data.
CallArgs call_args, NextPromiseFactory next_promise_factory) { static void message_size_destroy_channel_elem(grpc_channel_element* elem) {
CallBuilder b(limits()); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
b.AddSend(call_args.server_to_client_messages); chand->~channel_data();
b.AddRecv(call_args.client_to_server_messages);
return b.Run(std::move(call_args), std::move(next_promise_factory));
} }
namespace { const grpc_channel_filter grpc_message_size_filter = {
message_size_start_transport_stream_op_batch,
nullptr,
grpc_channel_next_op,
sizeof(call_data),
message_size_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
message_size_destroy_call_elem,
sizeof(channel_data),
message_size_init_channel_elem,
grpc_channel_stack_no_post_init,
message_size_destroy_channel_elem,
grpc_channel_next_get_info,
"message_size"};
// Used for GRPC_CLIENT_SUBCHANNEL // Used for GRPC_CLIENT_SUBCHANNEL
bool MaybeAddMessageSizeFilterToSubchannel(ChannelStackBuilder* builder) { static bool maybe_add_message_size_filter_subchannel(
grpc_core::ChannelStackBuilder* builder) {
if (builder->channel_args().WantMinimalStack()) { if (builder->channel_args().WantMinimalStack()) {
return true; return true;
} }
builder->PrependFilter(&ClientMessageSizeFilter::kFilter); builder->PrependFilter(&grpc_message_size_filter);
return true; return true;
} }
// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the // Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the filter
// filter only if message size limits or service config is specified. // only if message size limits or service config is specified.
auto MaybeAddMessageSizeFilter(const grpc_channel_filter* filter) { static bool maybe_add_message_size_filter(
return [filter](ChannelStackBuilder* builder) { grpc_core::ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args(); auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack()) { if (channel_args.WantMinimalStack()) {
return true;
}
MessageSizeParsedConfig limits =
MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
const bool enable =
limits.max_send_size().has_value() ||
limits.max_recv_size().has_value() ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
if (enable) builder->PrependFilter(filter);
return true; return true;
}; }
grpc_core::MessageSizeParsedConfig limits =
grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
const bool enable =
limits.max_send_size().has_value() ||
limits.max_recv_size().has_value() ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
if (enable) builder->PrependFilter(&grpc_message_size_filter);
return true;
} }
} // namespace namespace grpc_core {
void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) { void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
MessageSizeParser::Register(builder); MessageSizeParser::Register(builder);
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilterToSubchannel);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilter(&ClientMessageSizeFilter::kFilter));
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
MaybeAddMessageSizeFilter(&ServerMessageSizeFilter::kFilter)); maybe_add_message_size_filter_subchannel);
builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter);
builder->channel_init()->RegisterStage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter);
} }
} // namespace grpc_core } // namespace grpc_core

@ -24,22 +24,21 @@
#include <memory> #include <memory>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/json/json.h" #include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h" #include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/service_config/service_config_parser.h" #include "src/core/lib/service_config/service_config_parser.h"
#include "src/core/lib/transport/transport.h"
extern const grpc_channel_filter grpc_message_size_filter;
namespace grpc_core { namespace grpc_core {
@ -86,50 +85,6 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
absl::optional<uint32_t> GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args); absl::optional<uint32_t> GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args);
absl::optional<uint32_t> GetMaxSendSizeFromChannelArgs(const ChannelArgs& args); absl::optional<uint32_t> GetMaxSendSizeFromChannelArgs(const ChannelArgs& args);
class MessageSizeFilter : public ChannelFilter {
protected:
explicit MessageSizeFilter(const ChannelArgs& args)
: limits_(MessageSizeParsedConfig::GetFromChannelArgs(args)) {}
class CallBuilder;
const MessageSizeParsedConfig& limits() const { return limits_; }
private:
MessageSizeParsedConfig limits_;
};
class ServerMessageSizeFilter final : public MessageSizeFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerMessageSizeFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using MessageSizeFilter::MessageSizeFilter;
};
class ClientMessageSizeFilter final : public MessageSizeFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientMessageSizeFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
const size_t service_config_parser_index_{MessageSizeParser::ParserIndex()};
using MessageSizeFilter::MessageSizeFilter;
};
} // namespace grpc_core } // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H #endif // GRPC_SRC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H

@ -766,8 +766,6 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
nullptr); nullptr);
s->to_read_trailing_md.Clear(); s->to_read_trailing_md.Clear();
s->to_read_trailing_md_filled = false; s->to_read_trailing_md_filled = false;
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
// We should schedule the recv_trailing_md_op completion if // We should schedule the recv_trailing_md_op completion if
// 1. this stream is the client-side // 1. this stream is the client-side

@ -56,7 +56,6 @@
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
@ -74,7 +73,6 @@
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_trace.h" #include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_fwd.h" #include "src/core/lib/transport/transport_fwd.h"
@ -480,15 +478,7 @@ class ConnectedChannelStream : public Orphanable {
return Match( return Match(
recv_message_state_, [](Idle) -> std::string { return "IDLE"; }, recv_message_state_, [](Idle) -> std::string { return "IDLE"; },
[](Closed) -> std::string { return "CLOSED"; }, [](Closed) -> std::string { return "CLOSED"; },
[](const PendingReceiveMessage& m) -> std::string { [](const PendingReceiveMessage&) -> std::string { return "WAITING"; },
if (m.received) {
return absl::StrCat("RECEIVED_FROM_TRANSPORT:",
m.payload.has_value()
? absl::StrCat(m.payload->Length(), "b")
: "EOS");
}
return "WAITING";
},
[](const absl::optional<MessageHandle>& message) -> std::string { [](const absl::optional<MessageHandle>& message) -> std::string {
return absl::StrCat( return absl::StrCat(
"READY:", message.has_value() "READY:", message.has_value()
@ -580,7 +570,13 @@ class ConnectedChannelStream : public Orphanable {
void RecvMessageBatchDone(grpc_error_handle error) { void RecvMessageBatchDone(grpc_error_handle error) {
{ {
MutexLock lock(mu()); MutexLock lock(mu());
if (absl::holds_alternative<Closed>(recv_message_state_)) { if (error != absl::OkStatus()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[connected] RecvMessageBatchDone: error=%s",
recv_message_waker_.ActivityDebugTag().c_str(),
StatusToString(error).c_str());
}
} else if (absl::holds_alternative<Closed>(recv_message_state_)) {
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"%s[connected] RecvMessageBatchDone: already closed, " "%s[connected] RecvMessageBatchDone: already closed, "
@ -588,21 +584,14 @@ class ConnectedChannelStream : public Orphanable {
recv_message_waker_.ActivityDebugTag().c_str()); recv_message_waker_.ActivityDebugTag().c_str());
} }
} else { } else {
auto pending = if (grpc_call_trace.enabled()) {
absl::get_if<PendingReceiveMessage>(&recv_message_state_);
GPR_ASSERT(pending != nullptr);
if (!error.ok()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[connected] RecvMessageBatchDone: error=%s",
recv_message_waker_.ActivityDebugTag().c_str(),
StatusToString(error).c_str());
}
pending->payload.reset();
} else if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"%s[connected] RecvMessageBatchDone: received message", "%s[connected] RecvMessageBatchDone: received message",
recv_message_waker_.ActivityDebugTag().c_str()); recv_message_waker_.ActivityDebugTag().c_str());
} }
auto pending =
absl::get_if<PendingReceiveMessage>(&recv_message_state_);
GPR_ASSERT(pending != nullptr);
GPR_ASSERT(pending->received == false); GPR_ASSERT(pending->received == false);
pending->received = true; pending->received = true;
} }
@ -682,8 +671,6 @@ class ClientStream : public ConnectedChannelStream {
public: public:
ClientStream(grpc_transport* transport, CallArgs call_args) ClientStream(grpc_transport* transport, CallArgs call_args)
: ConnectedChannelStream(transport), : ConnectedChannelStream(transport),
client_initial_metadata_outstanding_token_(
std::move(call_args.client_initial_metadata_outstanding)),
server_initial_metadata_pipe_(call_args.server_initial_metadata), server_initial_metadata_pipe_(call_args.server_initial_metadata),
client_to_server_messages_(call_args.client_to_server_messages), client_to_server_messages_(call_args.client_to_server_messages),
server_to_client_messages_(call_args.server_to_client_messages), server_to_client_messages_(call_args.server_to_client_messages),
@ -717,14 +704,12 @@ class ClientStream : public ConnectedChannelStream {
nullptr, GetContext<Arena>()); nullptr, GetContext<Arena>());
grpc_transport_set_pops(transport(), stream(), grpc_transport_set_pops(transport(), stream(),
GetContext<CallContext>()->polling_entity()); GetContext<CallContext>()->polling_entity());
memset(&send_metadata_, 0, sizeof(send_metadata_)); memset(&metadata_, 0, sizeof(metadata_));
memset(&recv_metadata_, 0, sizeof(recv_metadata_)); metadata_.send_initial_metadata = true;
send_metadata_.send_initial_metadata = true; metadata_.recv_initial_metadata = true;
recv_metadata_.recv_initial_metadata = true; metadata_.recv_trailing_metadata = true;
recv_metadata_.recv_trailing_metadata = true; metadata_.payload = batch_payload();
send_metadata_.payload = batch_payload(); metadata_.on_complete = &metadata_batch_done_;
recv_metadata_.payload = batch_payload();
send_metadata_.on_complete = &send_metadata_batch_done_;
batch_payload()->send_initial_metadata.send_initial_metadata = batch_payload()->send_initial_metadata.send_initial_metadata =
client_initial_metadata_.get(); client_initial_metadata_.get();
batch_payload()->send_initial_metadata.peer_string = batch_payload()->send_initial_metadata.peer_string =
@ -749,16 +734,9 @@ class ClientStream : public ConnectedChannelStream {
IncrementRefCount("metadata_batch_done"); IncrementRefCount("metadata_batch_done");
IncrementRefCount("initial_metadata_ready"); IncrementRefCount("initial_metadata_ready");
IncrementRefCount("trailing_metadata_ready"); IncrementRefCount("trailing_metadata_ready");
recv_initial_metadata_waker_ = Activity::current()->MakeOwningWaker(); initial_metadata_waker_ = Activity::current()->MakeOwningWaker();
recv_trailing_metadata_waker_ = Activity::current()->MakeOwningWaker(); trailing_metadata_waker_ = Activity::current()->MakeOwningWaker();
send_initial_metadata_waker_ = Activity::current()->MakeOwningWaker(); SchedulePush(&metadata_);
SchedulePush(&send_metadata_);
SchedulePush(&recv_metadata_);
}
if (std::exchange(need_to_clear_client_initial_metadata_outstanding_token_,
false)) {
client_initial_metadata_outstanding_token_.Complete(
client_initial_metadata_send_result_);
} }
if (server_initial_metadata_state_ == if (server_initial_metadata_state_ ==
ServerInitialMetadataState::kReceivedButNotPushed) { ServerInitialMetadataState::kReceivedButNotPushed) {
@ -775,21 +753,9 @@ class ClientStream : public ConnectedChannelStream {
server_initial_metadata_push_promise_.reset(); server_initial_metadata_push_promise_.reset();
} }
} }
if (server_initial_metadata_state_ == ServerInitialMetadataState::kError) {
server_initial_metadata_pipe_->Close();
}
PollSendMessage(client_to_server_messages_, &client_trailing_metadata_); PollSendMessage(client_to_server_messages_, &client_trailing_metadata_);
PollRecvMessage(server_to_client_messages_); PollRecvMessage(server_to_client_messages_);
if (grpc_call_trace.enabled()) { if (server_initial_metadata_state_ == ServerInitialMetadataState::kPushed &&
gpr_log(
GPR_INFO,
"%s[connected] Finishing PollConnectedChannel: requesting metadata",
Activity::current()->DebugTag().c_str());
}
if ((server_initial_metadata_state_ ==
ServerInitialMetadataState::kPushed ||
server_initial_metadata_state_ ==
ServerInitialMetadataState::kError) &&
!IsPromiseReceiving() && !IsPromiseReceiving() &&
std::exchange(queued_trailing_metadata_, false)) { std::exchange(queued_trailing_metadata_, false)) {
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
@ -808,32 +774,18 @@ class ClientStream : public ConnectedChannelStream {
} }
void RecvInitialMetadataReady(grpc_error_handle error) { void RecvInitialMetadataReady(grpc_error_handle error) {
GPR_ASSERT(error == absl::OkStatus());
{ {
MutexLock lock(mu()); MutexLock lock(mu());
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] RecvInitialMetadataReady: error=%s",
recv_initial_metadata_waker_.ActivityDebugTag().c_str(),
error.ToString().c_str());
}
server_initial_metadata_state_ = server_initial_metadata_state_ =
error.ok() ? ServerInitialMetadataState::kReceivedButNotPushed ServerInitialMetadataState::kReceivedButNotPushed;
: ServerInitialMetadataState::kError; initial_metadata_waker_.Wakeup();
recv_initial_metadata_waker_.Wakeup();
} }
Unref("initial_metadata_ready"); Unref("initial_metadata_ready");
} }
void RecvTrailingMetadataReady(grpc_error_handle error) { void RecvTrailingMetadataReady(grpc_error_handle error) {
if (!error.ok()) { GPR_ASSERT(error == absl::OkStatus());
server_trailing_metadata_->Clear();
grpc_status_code status = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(error, Timestamp::InfFuture(), &status, &message,
nullptr, nullptr);
server_trailing_metadata_->Set(GrpcStatusMetadata(), status);
server_trailing_metadata_->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(message));
}
{ {
MutexLock lock(mu()); MutexLock lock(mu());
queued_trailing_metadata_ = true; queued_trailing_metadata_ = true;
@ -842,21 +794,16 @@ class ClientStream : public ConnectedChannelStream {
"%s[connected] RecvTrailingMetadataReady: " "%s[connected] RecvTrailingMetadataReady: "
"queued_trailing_metadata_ " "queued_trailing_metadata_ "
"set to true; active_ops: %s", "set to true; active_ops: %s",
recv_trailing_metadata_waker_.ActivityDebugTag().c_str(), trailing_metadata_waker_.ActivityDebugTag().c_str(),
ActiveOpsString().c_str()); ActiveOpsString().c_str());
} }
recv_trailing_metadata_waker_.Wakeup(); trailing_metadata_waker_.Wakeup();
} }
Unref("trailing_metadata_ready"); Unref("trailing_metadata_ready");
} }
void SendMetadataBatchDone(grpc_error_handle error) { void MetadataBatchDone(grpc_error_handle error) {
{ GPR_ASSERT(error == absl::OkStatus());
MutexLock lock(mu());
need_to_clear_client_initial_metadata_outstanding_token_ = true;
client_initial_metadata_send_result_ = error.ok();
send_initial_metadata_waker_.Wakeup();
}
Unref("metadata_batch_done"); Unref("metadata_batch_done");
} }
@ -876,8 +823,6 @@ class ClientStream : public ConnectedChannelStream {
// has been pushed on the pipe to publish it up the call stack AND removed // has been pushed on the pipe to publish it up the call stack AND removed
// by the call at the top. // by the call at the top.
kPushed, kPushed,
// Received initial metadata with an error status.
kError,
}; };
std::string ActiveOpsString() const override std::string ActiveOpsString() const override
@ -886,10 +831,10 @@ class ClientStream : public ConnectedChannelStream {
if (finished()) ops.push_back("FINISHED"); if (finished()) ops.push_back("FINISHED");
// Outstanding Operations on Transport // Outstanding Operations on Transport
std::vector<std::string> waiting; std::vector<std::string> waiting;
if (recv_initial_metadata_waker_ != Waker()) { if (initial_metadata_waker_ != Waker()) {
waiting.push_back("initial_metadata"); waiting.push_back("initial_metadata");
} }
if (recv_trailing_metadata_waker_ != Waker()) { if (trailing_metadata_waker_ != Waker()) {
waiting.push_back("trailing_metadata"); waiting.push_back("trailing_metadata");
} }
if (!waiting.empty()) { if (!waiting.empty()) {
@ -905,18 +850,6 @@ class ClientStream : public ConnectedChannelStream {
if (!queued.empty()) { if (!queued.empty()) {
ops.push_back(absl::StrCat("queued:", absl::StrJoin(queued, ","))); ops.push_back(absl::StrCat("queued:", absl::StrJoin(queued, ",")));
} }
switch (server_initial_metadata_state_) {
case ServerInitialMetadataState::kNotReceived:
case ServerInitialMetadataState::kReceivedButNotPushed:
case ServerInitialMetadataState::kPushed:
break;
case ServerInitialMetadataState::kPushing:
ops.push_back("server_initial_metadata:PUSHING");
break;
case ServerInitialMetadataState::kError:
ops.push_back("server_initial_metadata:ERROR");
break;
}
// Send message // Send message
std::string send_message_state = SendMessageString(); std::string send_message_state = SendMessageString();
if (send_message_state != "WAITING") { if (send_message_state != "WAITING") {
@ -931,17 +864,11 @@ class ClientStream : public ConnectedChannelStream {
} }
bool requested_metadata_ = false; bool requested_metadata_ = false;
bool need_to_clear_client_initial_metadata_outstanding_token_
ABSL_GUARDED_BY(mu()) = false;
bool client_initial_metadata_send_result_ ABSL_GUARDED_BY(mu());
ServerInitialMetadataState server_initial_metadata_state_ ServerInitialMetadataState server_initial_metadata_state_
ABSL_GUARDED_BY(mu()) = ServerInitialMetadataState::kNotReceived; ABSL_GUARDED_BY(mu()) = ServerInitialMetadataState::kNotReceived;
bool queued_trailing_metadata_ ABSL_GUARDED_BY(mu()) = false; bool queued_trailing_metadata_ ABSL_GUARDED_BY(mu()) = false;
Waker recv_initial_metadata_waker_ ABSL_GUARDED_BY(mu()); Waker initial_metadata_waker_ ABSL_GUARDED_BY(mu());
Waker send_initial_metadata_waker_ ABSL_GUARDED_BY(mu()); Waker trailing_metadata_waker_ ABSL_GUARDED_BY(mu());
Waker recv_trailing_metadata_waker_ ABSL_GUARDED_BY(mu());
ClientInitialMetadataOutstandingToken
client_initial_metadata_outstanding_token_;
PipeSender<ServerMetadataHandle>* server_initial_metadata_pipe_; PipeSender<ServerMetadataHandle>* server_initial_metadata_pipe_;
PipeReceiver<MessageHandle>* client_to_server_messages_; PipeReceiver<MessageHandle>* client_to_server_messages_;
PipeSender<MessageHandle>* server_to_client_messages_; PipeSender<MessageHandle>* server_to_client_messages_;
@ -957,10 +884,9 @@ class ClientStream : public ConnectedChannelStream {
ServerMetadataHandle server_trailing_metadata_; ServerMetadataHandle server_trailing_metadata_;
absl::optional<PipeSender<ServerMetadataHandle>::PushType> absl::optional<PipeSender<ServerMetadataHandle>::PushType>
server_initial_metadata_push_promise_; server_initial_metadata_push_promise_;
grpc_transport_stream_op_batch send_metadata_; grpc_transport_stream_op_batch metadata_;
grpc_transport_stream_op_batch recv_metadata_; grpc_closure metadata_batch_done_ =
grpc_closure send_metadata_batch_done_ = MakeMemberClosure<ClientStream, &ClientStream::MetadataBatchDone>(
MakeMemberClosure<ClientStream, &ClientStream::SendMetadataBatchDone>(
this, DEBUG_LOCATION); this, DEBUG_LOCATION);
}; };
@ -1022,7 +948,6 @@ class ServerStream final : public ConnectedChannelStream {
gim.client_initial_metadata.get(); gim.client_initial_metadata.get();
batch_payload()->recv_initial_metadata.recv_initial_metadata_ready = batch_payload()->recv_initial_metadata.recv_initial_metadata_ready =
&gim.recv_initial_metadata_ready; &gim.recv_initial_metadata_ready;
IncrementRefCount("RecvInitialMetadata");
SchedulePush(&gim.recv_initial_metadata); SchedulePush(&gim.recv_initial_metadata);
// Fetch trailing metadata (to catch cancellations) // Fetch trailing metadata (to catch cancellations)
@ -1040,9 +965,8 @@ class ServerStream final : public ConnectedChannelStream {
&GetContext<CallContext>()->call_stats()->transport_stream_stats; &GetContext<CallContext>()->call_stats()->transport_stream_stats;
batch_payload()->recv_trailing_metadata.recv_trailing_metadata_ready = batch_payload()->recv_trailing_metadata.recv_trailing_metadata_ready =
&gtm.recv_trailing_metadata_ready; &gtm.recv_trailing_metadata_ready;
gtm.waker = Activity::current()->MakeOwningWaker();
IncrementRefCount("RecvTrailingMetadata");
SchedulePush(&gtm.recv_trailing_metadata); SchedulePush(&gtm.recv_trailing_metadata);
gtm.waker = Activity::current()->MakeOwningWaker();
} }
Poll<ServerMetadataHandle> PollOnce() { Poll<ServerMetadataHandle> PollOnce() {
@ -1071,7 +995,6 @@ class ServerStream final : public ConnectedChannelStream {
.emplace<ServerMetadataHandle>(std::move(**md)) .emplace<ServerMetadataHandle>(std::move(**md))
.get(); .get();
batch_payload()->send_initial_metadata.peer_string = nullptr; batch_payload()->send_initial_metadata.peer_string = nullptr;
IncrementRefCount("SendInitialMetadata");
SchedulePush(&send_initial_metadata_); SchedulePush(&send_initial_metadata_);
return true; return true;
} else { } else {
@ -1116,7 +1039,6 @@ class ServerStream final : public ConnectedChannelStream {
incoming_messages_ = &pipes_.client_to_server.sender; incoming_messages_ = &pipes_.client_to_server.sender;
auto promise = p->next_promise_factory(CallArgs{ auto promise = p->next_promise_factory(CallArgs{
std::move(p->client_initial_metadata), std::move(p->client_initial_metadata),
ClientInitialMetadataOutstandingToken::Empty(),
&pipes_.server_initial_metadata.sender, &pipes_.server_initial_metadata.sender,
&pipes_.client_to_server.receiver, &pipes_.server_to_client.sender}); &pipes_.client_to_server.receiver, &pipes_.server_to_client.sender});
call_state_.emplace<MessageLoop>( call_state_.emplace<MessageLoop>(
@ -1171,7 +1093,6 @@ class ServerStream final : public ConnectedChannelStream {
->as_string_view()), ->as_string_view()),
StatusIntProperty::kRpcStatus, status_code); StatusIntProperty::kRpcStatus, status_code);
} }
IncrementRefCount("SendTrailingMetadata");
SchedulePush(&op); SchedulePush(&op);
} }
} }
@ -1267,7 +1188,6 @@ class ServerStream final : public ConnectedChannelStream {
std::move(getting.next_promise_factory)}; std::move(getting.next_promise_factory)};
call_state_.emplace<GotInitialMetadata>(std::move(got)); call_state_.emplace<GotInitialMetadata>(std::move(got));
waker.Wakeup(); waker.Wakeup();
Unref("RecvInitialMetadata");
} }
void SendTrailingMetadataDone(absl::Status result) { void SendTrailingMetadataDone(absl::Status result) {
@ -1280,19 +1200,16 @@ class ServerStream final : public ConnectedChannelStream {
waker.ActivityDebugTag().c_str(), result.ToString().c_str(), waker.ActivityDebugTag().c_str(), result.ToString().c_str(),
completing.sent ? "true" : "false", md->DebugString().c_str()); completing.sent ? "true" : "false", md->DebugString().c_str());
} }
md->Set(GrpcStatusFromWire(), completing.sent);
if (!result.ok()) { if (!result.ok()) {
md->Clear(); md->Clear();
md->Set(GrpcStatusMetadata(), md->Set(GrpcStatusMetadata(),
static_cast<grpc_status_code>(result.code())); static_cast<grpc_status_code>(result.code()));
md->Set(GrpcMessageMetadata(), Slice::FromCopiedString(result.message())); md->Set(GrpcMessageMetadata(), Slice::FromCopiedString(result.message()));
md->Set(GrpcCallWasCancelled(), true); md->Set(GrpcStatusFromWire(), false);
}
if (!md->get(GrpcCallWasCancelled()).has_value()) {
md->Set(GrpcCallWasCancelled(), !completing.sent);
} }
call_state_.emplace<Complete>(Complete{std::move(md)}); call_state_.emplace<Complete>(Complete{std::move(md)});
waker.Wakeup(); waker.Wakeup();
Unref("SendTrailingMetadata");
} }
std::string ActiveOpsString() const override std::string ActiveOpsString() const override
@ -1344,7 +1261,7 @@ class ServerStream final : public ConnectedChannelStream {
return absl::StrJoin(ops, " "); return absl::StrJoin(ops, " ");
} }
void SendInitialMetadataDone() { Unref("SendInitialMetadata"); } void SendInitialMetadataDone() {}
void RecvTrailingMetadataReady(absl::Status error) { void RecvTrailingMetadataReady(absl::Status error) {
MutexLock lock(mu()); MutexLock lock(mu());
@ -1368,7 +1285,6 @@ class ServerStream final : public ConnectedChannelStream {
client_trailing_metadata_state_.emplace<GotClientHalfClose>( client_trailing_metadata_state_.emplace<GotClientHalfClose>(
GotClientHalfClose{error}); GotClientHalfClose{error});
waker.Wakeup(); waker.Wakeup();
Unref("RecvTrailingMetadata");
} }
struct Pipes { struct Pipes {

@ -16,8 +16,6 @@
#include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/channel/promise_based_filter.h"
#include <inttypes.h>
#include <algorithm> #include <algorithm>
#include <initializer_list> #include <initializer_list>
#include <memory> #include <memory>
@ -217,7 +215,7 @@ void BaseCallData::CapturedBatch::ResumeWith(Flusher* releaser) {
// refcnt==0 ==> cancelled // refcnt==0 ==> cancelled
if (grpc_trace_channel.enabled()) { if (grpc_trace_channel.enabled()) {
gpr_log(GPR_INFO, "%sRESUME BATCH REQUEST CANCELLED", gpr_log(GPR_INFO, "%sRESUME BATCH REQUEST CANCELLED",
releaser->call()->DebugTag().c_str()); Activity::current()->DebugTag().c_str());
} }
return; return;
} }
@ -241,10 +239,6 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
auto* batch = std::exchange(batch_, nullptr); auto* batch = std::exchange(batch_, nullptr);
GPR_ASSERT(batch != nullptr); GPR_ASSERT(batch != nullptr);
uintptr_t& refcnt = *RefCountField(batch); uintptr_t& refcnt = *RefCountField(batch);
gpr_log(GPR_DEBUG, "%sCancelWith: %p refs=%" PRIdPTR " err=%s [%s]",
releaser->call()->DebugTag().c_str(), batch, refcnt,
error.ToString().c_str(),
grpc_transport_stream_op_batch_string(batch).c_str());
if (refcnt == 0) { if (refcnt == 0) {
// refcnt==0 ==> cancelled // refcnt==0 ==> cancelled
if (grpc_trace_channel.enabled()) { if (grpc_trace_channel.enabled()) {
@ -331,8 +325,6 @@ const char* BaseCallData::SendMessage::StateString(State state) {
return "CANCELLED"; return "CANCELLED";
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
return "CANCELLED_BUT_NOT_YET_POLLED"; return "CANCELLED_BUT_NOT_YET_POLLED";
case State::kCancelledButNoStatus:
return "CANCELLED_BUT_NO_STATUS";
} }
return "UNKNOWN"; return "UNKNOWN";
} }
@ -357,7 +349,6 @@ void BaseCallData::SendMessage::StartOp(CapturedBatch batch) {
Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_))); Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_)));
case State::kCancelled: case State::kCancelled:
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
case State::kCancelledButNoStatus:
return; return;
} }
batch_ = batch; batch_ = batch;
@ -385,7 +376,6 @@ void BaseCallData::SendMessage::GotPipe(T* pipe_end) {
case State::kForwardedBatch: case State::kForwardedBatch:
case State::kBatchCompleted: case State::kBatchCompleted:
case State::kPushedToPipe: case State::kPushedToPipe:
case State::kCancelledButNoStatus:
Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_))); Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_)));
case State::kCancelled: case State::kCancelled:
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
@ -401,7 +391,6 @@ bool BaseCallData::SendMessage::IsIdle() const {
case State::kForwardedBatch: case State::kForwardedBatch:
case State::kCancelled: case State::kCancelled:
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
case State::kCancelledButNoStatus:
return true; return true;
case State::kGotBatchNoPipe: case State::kGotBatchNoPipe:
case State::kGotBatch: case State::kGotBatch:
@ -430,7 +419,6 @@ void BaseCallData::SendMessage::OnComplete(absl::Status status) {
break; break;
case State::kCancelled: case State::kCancelled:
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
case State::kCancelledButNoStatus:
flusher.AddClosure(intercepted_on_complete_, status, flusher.AddClosure(intercepted_on_complete_, status,
"forward after cancel"); "forward after cancel");
break; break;
@ -455,14 +443,10 @@ void BaseCallData::SendMessage::Done(const ServerMetadata& metadata,
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
break; break;
case State::kInitial: case State::kInitial:
state_ = State::kCancelled;
break;
case State::kIdle: case State::kIdle:
case State::kForwardedBatch: case State::kForwardedBatch:
state_ = State::kCancelledButNotYetPolled; state_ = State::kCancelledButNotYetPolled;
if (base_->is_current()) base_->ForceImmediateRepoll();
break; break;
case State::kCancelledButNoStatus:
case State::kGotBatchNoPipe: case State::kGotBatchNoPipe:
case State::kGotBatch: { case State::kGotBatch: {
std::string temp; std::string temp;
@ -481,7 +465,6 @@ void BaseCallData::SendMessage::Done(const ServerMetadata& metadata,
push_.reset(); push_.reset();
next_.reset(); next_.reset();
state_ = State::kCancelledButNotYetPolled; state_ = State::kCancelledButNotYetPolled;
if (base_->is_current()) base_->ForceImmediateRepoll();
break; break;
} }
} }
@ -500,7 +483,6 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
case State::kIdle: case State::kIdle:
case State::kGotBatchNoPipe: case State::kGotBatchNoPipe:
case State::kCancelled: case State::kCancelled:
case State::kCancelledButNoStatus:
break; break;
case State::kCancelledButNotYetPolled: case State::kCancelledButNotYetPolled:
interceptor()->Push()->Close(); interceptor()->Push()->Close();
@ -542,18 +524,13 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
"result.has_value=%s", "result.has_value=%s",
base_->LogTag().c_str(), p->has_value() ? "true" : "false"); base_->LogTag().c_str(), p->has_value() ? "true" : "false");
} }
if (p->has_value()) { GPR_ASSERT(p->has_value());
batch_->payload->send_message.send_message->Swap((**p)->payload()); batch_->payload->send_message.send_message->Swap((**p)->payload());
batch_->payload->send_message.flags = (**p)->flags(); batch_->payload->send_message.flags = (**p)->flags();
state_ = State::kForwardedBatch; state_ = State::kForwardedBatch;
batch_.ResumeWith(flusher); batch_.ResumeWith(flusher);
next_.reset(); next_.reset();
if (!absl::holds_alternative<Pending>((*push_)())) push_.reset(); if (!absl::holds_alternative<Pending>((*push_)())) push_.reset();
} else {
state_ = State::kCancelledButNoStatus;
next_.reset();
push_.reset();
}
} }
} break; } break;
case State::kForwardedBatch: case State::kForwardedBatch:
@ -1113,14 +1090,11 @@ class ClientCallData::PollContext {
// Poll the promise once since we're waiting for it. // Poll the promise once since we're waiting for it.
Poll<ServerMetadataHandle> poll = self_->promise_(); Poll<ServerMetadataHandle> poll = self_->promise_();
if (grpc_trace_channel.enabled()) { if (grpc_trace_channel.enabled()) {
gpr_log(GPR_INFO, "%s ClientCallData.PollContext.Run: poll=%s; %s", gpr_log(GPR_INFO, "%s ClientCallData.PollContext.Run: poll=%s",
self_->LogTag().c_str(), self_->LogTag().c_str(),
PollToString(poll, PollToString(poll, [](const ServerMetadataHandle& h) {
[](const ServerMetadataHandle& h) { return h->DebugString();
return h->DebugString(); }).c_str());
})
.c_str(),
self_->DebugString().c_str());
} }
if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) { if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) {
auto md = std::move(*r); auto md = std::move(*r);
@ -1300,11 +1274,7 @@ ClientCallData::ClientCallData(grpc_call_element* elem,
[args]() { [args]() {
return args->arena->New<ReceiveInterceptor>(args->arena); return args->arena->New<ReceiveInterceptor>(args->arena);
}, },
[args]() { return args->arena->New<SendInterceptor>(args->arena); }), [args]() { return args->arena->New<SendInterceptor>(args->arena); }) {
initial_metadata_outstanding_token_(
(flags & kFilterIsLast) != 0
? ClientInitialMetadataOutstandingToken::New(arena())
: ClientInitialMetadataOutstandingToken::Empty()) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyCallback, this, RecvTrailingMetadataReadyCallback, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
@ -1573,7 +1543,6 @@ void ClientCallData::StartPromise(Flusher* flusher) {
promise_ = filter->MakeCallPromise( promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(send_initial_metadata_batch_->payload CallArgs{WrapMetadata(send_initial_metadata_batch_->payload
->send_initial_metadata.send_initial_metadata), ->send_initial_metadata.send_initial_metadata),
std::move(initial_metadata_outstanding_token_),
server_initial_metadata_pipe() == nullptr server_initial_metadata_pipe() == nullptr
? nullptr ? nullptr
: &server_initial_metadata_pipe()->sender, : &server_initial_metadata_pipe()->sender,
@ -1894,15 +1863,8 @@ struct ServerCallData::SendInitialMetadata {
class ServerCallData::PollContext { class ServerCallData::PollContext {
public: public:
explicit PollContext(ServerCallData* self, Flusher* flusher, explicit PollContext(ServerCallData* self, Flusher* flusher)
DebugLocation created = DebugLocation()) : self_(self), flusher_(flusher) {
: self_(self), flusher_(flusher), created_(created) {
if (self_->poll_ctx_ != nullptr) {
Crash(absl::StrCat(
"PollContext: disallowed recursion. New: ", created_.file(), ":",
created_.line(), "; Old: ", self_->poll_ctx_->created_.file(), ":",
self_->poll_ctx_->created_.line()));
}
GPR_ASSERT(self_->poll_ctx_ == nullptr); GPR_ASSERT(self_->poll_ctx_ == nullptr);
self_->poll_ctx_ = this; self_->poll_ctx_ = this;
scoped_activity_.Init(self_); scoped_activity_.Init(self_);
@ -1948,7 +1910,6 @@ class ServerCallData::PollContext {
Flusher* const flusher_; Flusher* const flusher_;
bool repoll_ = false; bool repoll_ = false;
bool have_scoped_activity_; bool have_scoped_activity_;
GPR_NO_UNIQUE_ADDRESS DebugLocation created_;
}; };
const char* ServerCallData::StateString(RecvInitialState state) { const char* ServerCallData::StateString(RecvInitialState state) {
@ -2118,10 +2079,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
switch (send_trailing_state_) { switch (send_trailing_state_) {
case SendTrailingState::kInitial: case SendTrailingState::kInitial:
send_trailing_metadata_batch_ = batch; send_trailing_metadata_batch_ = batch;
if (receive_message() != nullptr && if (receive_message() != nullptr) {
batch->payload->send_trailing_metadata.send_trailing_metadata
->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
receive_message()->Done( receive_message()->Done(
*batch->payload->send_trailing_metadata.send_trailing_metadata, *batch->payload->send_trailing_metadata.send_trailing_metadata,
&flusher); &flusher);
@ -2178,12 +2136,9 @@ void ServerCallData::Completed(grpc_error_handle error, Flusher* flusher) {
case SendTrailingState::kForwarded: case SendTrailingState::kForwarded:
send_trailing_state_ = SendTrailingState::kCancelled; send_trailing_state_ = SendTrailingState::kCancelled;
if (!error.ok()) { if (!error.ok()) {
call_stack()->IncrementRefCount();
auto* batch = grpc_make_transport_stream_op( auto* batch = grpc_make_transport_stream_op(
NewClosure([call_combiner = call_combiner(), NewClosure([call_combiner = call_combiner()](absl::Status) {
call_stack = call_stack()](absl::Status) {
GRPC_CALL_COMBINER_STOP(call_combiner, "done-cancel"); GRPC_CALL_COMBINER_STOP(call_combiner, "done-cancel");
call_stack->Unref();
})); }));
batch->cancel_stream = true; batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = error; batch->payload->cancel_stream.cancel_error = error;
@ -2357,7 +2312,6 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
FakeActivity(this).Run([this, filter] { FakeActivity(this).Run([this, filter] {
promise_ = filter->MakeCallPromise( promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(recv_initial_metadata_), CallArgs{WrapMetadata(recv_initial_metadata_),
ClientInitialMetadataOutstandingToken::Empty(),
server_initial_metadata_pipe() == nullptr server_initial_metadata_pipe() == nullptr
? nullptr ? nullptr
: &server_initial_metadata_pipe()->sender, : &server_initial_metadata_pipe()->sender,
@ -2459,14 +2413,9 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
(send_trailing_metadata_batch_->send_message && (send_trailing_metadata_batch_->send_message &&
send_message()->IsForwarded()))) { send_message()->IsForwarded()))) {
send_trailing_state_ = SendTrailingState::kQueued; send_trailing_state_ = SendTrailingState::kQueued;
if (send_trailing_metadata_batch_->payload->send_trailing_metadata send_message()->Done(*send_trailing_metadata_batch_->payload
.send_trailing_metadata->get(GrpcStatusMetadata()) ->send_trailing_metadata.send_trailing_metadata,
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) { flusher);
send_message()->Done(
*send_trailing_metadata_batch_->payload->send_trailing_metadata
.send_trailing_metadata,
flusher);
}
} }
} }
if (receive_message() != nullptr) { if (receive_message() != nullptr) {

@ -218,11 +218,7 @@ class BaseCallData : public Activity, private Wakeable {
void Resume(grpc_transport_stream_op_batch* batch) { void Resume(grpc_transport_stream_op_batch* batch) {
GPR_ASSERT(!call_->is_last()); GPR_ASSERT(!call_->is_last());
if (batch->HasOp()) { release_.push_back(batch);
release_.push_back(batch);
} else if (batch->on_complete != nullptr) {
Complete(batch);
}
} }
void Cancel(grpc_transport_stream_op_batch* batch, void Cancel(grpc_transport_stream_op_batch* batch,
@ -241,8 +237,6 @@ class BaseCallData : public Activity, private Wakeable {
call_closures_.Add(closure, error, reason); call_closures_.Add(closure, error, reason);
} }
BaseCallData* call() const { return call_; }
private: private:
absl::InlinedVector<grpc_transport_stream_op_batch*, 1> release_; absl::InlinedVector<grpc_transport_stream_op_batch*, 1> release_;
CallCombinerClosureList call_closures_; CallCombinerClosureList call_closures_;
@ -404,8 +398,6 @@ class BaseCallData : public Activity, private Wakeable {
kCancelledButNotYetPolled, kCancelledButNotYetPolled,
// We're done. // We're done.
kCancelled, kCancelled,
// We're done, but we haven't gotten a status yet
kCancelledButNoStatus,
}; };
static const char* StateString(State); static const char* StateString(State);
@ -672,8 +664,6 @@ class ClientCallData : public BaseCallData {
RecvTrailingState recv_trailing_state_ = RecvTrailingState::kInitial; RecvTrailingState recv_trailing_state_ = RecvTrailingState::kInitial;
// Polling related data. Non-null if we're actively polling // Polling related data. Non-null if we're actively polling
PollContext* poll_ctx_ = nullptr; PollContext* poll_ctx_ = nullptr;
// Initial metadata outstanding token
ClientInitialMetadataOutstandingToken initial_metadata_outstanding_token_;
}; };
class ServerCallData : public BaseCallData { class ServerCallData : public BaseCallData {

@ -171,8 +171,8 @@ class CallCombinerClosureList {
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"CallCombinerClosureList executing closure while already " "CallCombinerClosureList executing closure while already "
"holding call_combiner %p: closure=%s error=%s reason=%s", "holding call_combiner %p: closure=%p error=%s reason=%s",
call_combiner, closures_[0].closure->DebugString().c_str(), call_combiner, closures_[0].closure,
StatusToString(closures_[0].error).c_str(), closures_[0].reason); StatusToString(closures_[0].error).c_str(), closures_[0].reason);
} }
// This will release the call combiner. // This will release the call combiner.

@ -17,7 +17,6 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <memory>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
@ -107,9 +106,6 @@ class Curried {
private: private:
GPR_NO_UNIQUE_ADDRESS F f_; GPR_NO_UNIQUE_ADDRESS F f_;
GPR_NO_UNIQUE_ADDRESS Arg arg_; GPR_NO_UNIQUE_ADDRESS Arg arg_;
#ifndef NDEBUG
std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
#endif
}; };
// Promote a callable(A) -> T | Poll<T> to a PromiseFactory(A) -> Promise<T> by // Promote a callable(A) -> T | Poll<T> to a PromiseFactory(A) -> Promise<T> by

@ -140,8 +140,7 @@ class InterceptorList {
async_resolution_.space.get()); async_resolution_.space.get());
async_resolution_.current_factory = async_resolution_.current_factory =
async_resolution_.current_factory->next(); async_resolution_.current_factory->next();
if (!p->has_value()) async_resolution_.current_factory = nullptr; if (async_resolution_.current_factory == nullptr || !p->has_value()) {
if (async_resolution_.current_factory == nullptr) {
return std::move(*p); return std::move(*p);
} }
async_resolution_.current_factory->MakePromise( async_resolution_.current_factory->MakePromise(

@ -89,8 +89,6 @@ class Latch {
waiter_.Wake(); waiter_.Wake();
} }
bool is_set() const { return has_value_; }
private: private:
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x", return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x",
@ -167,7 +165,7 @@ class Latch<void> {
private: private:
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x", return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x",
reinterpret_cast<uintptr_t>(this), "]: "); reinterpret_cast<uintptr_t>(this), "]: ");
} }

@ -18,12 +18,12 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <string.h>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <cstddef>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string>
#include <utility> #include <utility>
#include "absl/status/status.h" #include "absl/status/status.h"
@ -41,7 +41,6 @@
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
@ -58,7 +57,6 @@
#include "src/core/lib/security/transport/auth_filters.h" // IWYU pragma: keep #include "src/core/lib/security/transport/auth_filters.h" // IWYU pragma: keep
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
@ -122,28 +120,12 @@ class ServerAuthFilter::RunApplicationCode {
// memory later // memory later
RunApplicationCode(ServerAuthFilter* filter, CallArgs call_args) RunApplicationCode(ServerAuthFilter* filter, CallArgs call_args)
: state_(GetContext<Arena>()->ManagedNew<State>(std::move(call_args))) { : state_(GetContext<Arena>()->ManagedNew<State>(std::move(call_args))) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_ERROR,
"%s[server-auth]: Delegate to application: filter=%p this=%p "
"auth_ctx=%p",
Activity::current()->DebugTag().c_str(), filter, this,
filter->auth_context_.get());
}
filter->server_credentials_->auth_metadata_processor().process( filter->server_credentials_->auth_metadata_processor().process(
filter->server_credentials_->auth_metadata_processor().state, filter->server_credentials_->auth_metadata_processor().state,
filter->auth_context_.get(), state_->md.metadata, state_->md.count, filter->auth_context_.get(), state_->md.metadata, state_->md.count,
OnMdProcessingDone, state_); OnMdProcessingDone, state_);
} }
RunApplicationCode(const RunApplicationCode&) = delete;
RunApplicationCode& operator=(const RunApplicationCode&) = delete;
RunApplicationCode(RunApplicationCode&& other) noexcept
: state_(std::exchange(other.state_, nullptr)) {}
RunApplicationCode& operator=(RunApplicationCode&& other) noexcept {
state_ = std::exchange(other.state_, nullptr);
return *this;
}
Poll<absl::StatusOr<CallArgs>> operator()() { Poll<absl::StatusOr<CallArgs>> operator()() {
if (state_->done.load(std::memory_order_acquire)) { if (state_->done.load(std::memory_order_acquire)) {
return Poll<absl::StatusOr<CallArgs>>(std::move(state_->call_args)); return Poll<absl::StatusOr<CallArgs>>(std::move(state_->call_args));

@ -480,7 +480,7 @@ int grpc_slice_slice(grpc_slice haystack, grpc_slice needle) {
} }
const uint8_t* last = haystack_bytes + haystack_len - needle_len; const uint8_t* last = haystack_bytes + haystack_len - needle_len;
for (const uint8_t* cur = haystack_bytes; cur <= last; ++cur) { for (const uint8_t* cur = haystack_bytes; cur != last; ++cur) {
if (0 == memcmp(cur, needle_bytes, needle_len)) { if (0 == memcmp(cur, needle_bytes, needle_len)) {
return static_cast<int>(cur - haystack_bytes); return static_cast<int>(cur - haystack_bytes);
} }

File diff suppressed because it is too large Load Diff

@ -79,7 +79,6 @@ ArenaPromise<ServerMetadataHandle> LameClientFilter::MakeCallPromise(
if (args.server_to_client_messages != nullptr) { if (args.server_to_client_messages != nullptr) {
args.server_to_client_messages->Close(); args.server_to_client_messages->Close();
} }
args.client_initial_metadata_outstanding.Complete(true);
return Immediate(ServerMetadataFromStatus(error_)); return Immediate(ServerMetadataFromStatus(error_));
} }

@ -396,15 +396,6 @@ struct GrpcStatusFromWire {
static absl::string_view DisplayValue(bool x) { return x ? "true" : "false"; } static absl::string_view DisplayValue(bool x) { return x ? "true" : "false"; }
}; };
// Annotation to denote that this call qualifies for cancelled=1 for the
// RECV_CLOSE_ON_SERVER op
struct GrpcCallWasCancelled {
static absl::string_view DebugKey() { return "GrpcCallWasCancelled"; }
static constexpr bool kRepeatable = false;
using ValueType = bool;
static absl::string_view DisplayValue(bool x) { return x ? "true" : "false"; }
};
// Annotation added by client surface code to denote wait-for-ready state // Annotation added by client surface code to denote wait-for-ready state
struct WaitForReady { struct WaitForReady {
struct ValueType { struct ValueType {
@ -1336,8 +1327,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
// Non-encodable things // Non-encodable things
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString, grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire, grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,
grpc_core::GrpcCallWasCancelled, grpc_core::WaitForReady, grpc_core::WaitForReady, grpc_core::GrpcTrailersOnly>;
grpc_core::GrpcTrailersOnly>;
struct grpc_metadata_batch : public grpc_metadata_batch_base { struct grpc_metadata_batch : public grpc_metadata_batch_base {
using grpc_metadata_batch_base::grpc_metadata_batch_base; using grpc_metadata_batch_base::grpc_metadata_batch_base;

@ -27,7 +27,6 @@
#include <functional> #include <functional>
#include <string> #include <string>
#include <type_traits>
#include <utility> #include <utility>
#include "absl/status/status.h" #include "absl/status/status.h"
@ -55,7 +54,6 @@
#include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/status.h" #include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/pipe.h"
#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
@ -146,70 +144,11 @@ struct StatusCastImpl<ServerMetadataHandle, absl::Status&> {
} }
}; };
// Move only type that tracks call startup.
// Allows observation of when client_initial_metadata has been processed by the
// end of the local call stack.
// Interested observers can call Wait() to obtain a promise that will resolve
// when all local client_initial_metadata processing has completed.
// The result of this token is either true on successful completion, or false
// if the metadata was not sent.
// To set a successful completion, call Complete(true). For failure, call
// Complete(false).
// If Complete is not called, the destructor of a still held token will complete
// with failure.
// Transports should hold this token until client_initial_metadata has passed
// any flow control (eg MAX_CONCURRENT_STREAMS for http2).
class ClientInitialMetadataOutstandingToken {
public:
static ClientInitialMetadataOutstandingToken Empty() {
return ClientInitialMetadataOutstandingToken();
}
static ClientInitialMetadataOutstandingToken New(
Arena* arena = GetContext<Arena>()) {
ClientInitialMetadataOutstandingToken token;
token.latch_ = arena->New<Latch<bool>>();
return token;
}
ClientInitialMetadataOutstandingToken(
const ClientInitialMetadataOutstandingToken&) = delete;
ClientInitialMetadataOutstandingToken& operator=(
const ClientInitialMetadataOutstandingToken&) = delete;
ClientInitialMetadataOutstandingToken(
ClientInitialMetadataOutstandingToken&& other) noexcept
: latch_(std::exchange(other.latch_, nullptr)) {}
ClientInitialMetadataOutstandingToken& operator=(
ClientInitialMetadataOutstandingToken&& other) noexcept {
latch_ = std::exchange(other.latch_, nullptr);
return *this;
}
~ClientInitialMetadataOutstandingToken() {
if (latch_ != nullptr) latch_->Set(false);
}
void Complete(bool success) { std::exchange(latch_, nullptr)->Set(success); }
// Returns a promise that will resolve when this object (or its moved-from
// ancestor) is dropped.
auto Wait() { return latch_->Wait(); }
private:
ClientInitialMetadataOutstandingToken() = default;
Latch<bool>* latch_ = nullptr;
};
using ClientInitialMetadataOutstandingTokenWaitType =
decltype(std::declval<ClientInitialMetadataOutstandingToken>().Wait());
struct CallArgs { struct CallArgs {
// Initial metadata from the client to the server. // Initial metadata from the client to the server.
// During promise setup this can be manipulated by filters (and then // During promise setup this can be manipulated by filters (and then
// passed on to the next filter). // passed on to the next filter).
ClientMetadataHandle client_initial_metadata; ClientMetadataHandle client_initial_metadata;
// Token indicating that client_initial_metadata is still being processed.
// This should be moved around and only destroyed when the transport is
// satisfied that the metadata has passed any flow control measures it has.
ClientInitialMetadataOutstandingToken client_initial_metadata_outstanding;
// Initial metadata from the server to the client. // Initial metadata from the server to the client.
// Set once when it's available. // Set once when it's available.
// During promise setup filters can substitute their own latch for this // During promise setup filters can substitute their own latch for this
@ -392,12 +331,6 @@ struct grpc_transport_stream_op_batch {
/// Is this stream traced /// Is this stream traced
bool is_traced : 1; bool is_traced : 1;
bool HasOp() const {
return send_initial_metadata || send_trailing_metadata || send_message ||
recv_initial_metadata || recv_message || recv_trailing_metadata ||
cancel_stream;
}
//************************************************************************** //**************************************************************************
// remaining fields are initialized and used at the discretion of the // remaining fields are initialized and used at the discretion of the
// current handler of the op // current handler of the op

@ -45,6 +45,8 @@
#define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key" #define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key"
static const char oauth2_md[] = "Bearer aaslkfjs424535asdf"; static const char oauth2_md[] = "Bearer aaslkfjs424535asdf";
static const char* client_identity_property_name = "smurf_name";
static const char* client_identity = "Brainy Smurf";
struct fullstack_secure_fixture_data { struct fullstack_secure_fixture_data {
std::string localaddr; std::string localaddr;
@ -68,7 +70,7 @@ typedef struct {
size_t pseudo_refcount; size_t pseudo_refcount;
} test_processor_state; } test_processor_state;
static void process_oauth2_success(void* state, grpc_auth_context*, static void process_oauth2_success(void* state, grpc_auth_context* ctx,
const grpc_metadata* md, size_t md_count, const grpc_metadata* md, size_t md_count,
grpc_process_auth_metadata_done_cb cb, grpc_process_auth_metadata_done_cb cb,
void* user_data) { void* user_data) {
@ -80,6 +82,10 @@ static void process_oauth2_success(void* state, grpc_auth_context*,
s = static_cast<test_processor_state*>(state); s = static_cast<test_processor_state*>(state);
GPR_ASSERT(s->pseudo_refcount == 1); GPR_ASSERT(s->pseudo_refcount == 1);
GPR_ASSERT(oauth2 != nullptr); GPR_ASSERT(oauth2 != nullptr);
grpc_auth_context_add_cstring_property(ctx, client_identity_property_name,
client_identity);
GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
ctx, client_identity_property_name) == 1);
cb(user_data, oauth2, 1, nullptr, 0, GRPC_STATUS_OK, nullptr); cb(user_data, oauth2, 1, nullptr, 0, GRPC_STATUS_OK, nullptr);
} }

@ -45,6 +45,8 @@
#define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key" #define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key"
static const char oauth2_md[] = "Bearer aaslkfjs424535asdf"; static const char oauth2_md[] = "Bearer aaslkfjs424535asdf";
static const char* client_identity_property_name = "smurf_name";
static const char* client_identity = "Brainy Smurf";
struct fullstack_secure_fixture_data { struct fullstack_secure_fixture_data {
std::string localaddr; std::string localaddr;
@ -68,7 +70,7 @@ typedef struct {
size_t pseudo_refcount; size_t pseudo_refcount;
} test_processor_state; } test_processor_state;
static void process_oauth2_success(void* state, grpc_auth_context*, static void process_oauth2_success(void* state, grpc_auth_context* ctx,
const grpc_metadata* md, size_t md_count, const grpc_metadata* md, size_t md_count,
grpc_process_auth_metadata_done_cb cb, grpc_process_auth_metadata_done_cb cb,
void* user_data) { void* user_data) {
@ -80,6 +82,10 @@ static void process_oauth2_success(void* state, grpc_auth_context*,
s = static_cast<test_processor_state*>(state); s = static_cast<test_processor_state*>(state);
GPR_ASSERT(s->pseudo_refcount == 1); GPR_ASSERT(s->pseudo_refcount == 1);
GPR_ASSERT(oauth2 != nullptr); GPR_ASSERT(oauth2 != nullptr);
grpc_auth_context_add_cstring_property(ctx, client_identity_property_name,
client_identity);
GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(
ctx, client_identity_property_name) == 1);
cb(user_data, oauth2, 1, nullptr, 0, GRPC_STATUS_OK, nullptr); cb(user_data, oauth2, 1, nullptr, 0, GRPC_STATUS_OK, nullptr);
} }

@ -210,7 +210,7 @@ static void on_p2s_sent_message(void* arg, int success) {
grpc_op op; grpc_op op;
grpc_call_error err; grpc_call_error err;
grpc_byte_buffer_destroy(std::exchange(pc->c2p_msg, nullptr)); grpc_byte_buffer_destroy(pc->c2p_msg);
if (!pc->proxy->shutdown && success) { if (!pc->proxy->shutdown && success) {
op.op = GRPC_OP_RECV_MESSAGE; op.op = GRPC_OP_RECV_MESSAGE;
op.flags = 0; op.flags = 0;

@ -21,7 +21,6 @@
#include <string.h> #include <string.h>
#include <algorithm> #include <algorithm>
#include <memory>
#include <vector> #include <vector>
#include "absl/status/status.h" #include "absl/status/status.h"
@ -41,10 +40,7 @@
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
#include "test/core/end2end/cq_verifier.h" #include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h" #include "test/core/end2end/end2end_tests.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
@ -452,23 +448,12 @@ static grpc_error_handle init_channel_elem(
static void destroy_channel_elem(grpc_channel_element* /*elem*/) {} static void destroy_channel_elem(grpc_channel_element* /*elem*/) {}
static const grpc_channel_filter test_filter = { static const grpc_channel_filter test_filter = {
grpc_call_next_op, grpc_call_next_op, nullptr,
[](grpc_channel_element*, grpc_core::CallArgs, grpc_channel_next_op, 0,
grpc_core::NextPromiseFactory) init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set,
-> grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> { destroy_call_elem, 0,
return grpc_core::Immediate(grpc_core::ServerMetadataFromStatus( init_channel_elem, grpc_channel_stack_no_post_init,
absl::PermissionDeniedError("access denied"))); destroy_channel_elem, grpc_channel_next_get_info,
},
grpc_channel_next_op,
0,
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0,
init_channel_elem,
grpc_channel_stack_no_post_init,
destroy_channel_elem,
grpc_channel_next_get_info,
"filter_init_fails"}; "filter_init_fails"};
//****************************************************************************** //******************************************************************************

@ -128,9 +128,6 @@ static void test_max_message_length_on_request(grpc_end2end_test_config config,
grpc_status_code status; grpc_status_code status;
grpc_call_error error; grpc_call_error error;
grpc_slice details; grpc_slice details;
grpc_slice expect_in_details = grpc_slice_from_copied_string(
send_limit ? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)");
int was_cancelled = 2; int was_cancelled = 2;
grpc_channel_args* client_args = nullptr; grpc_channel_args* client_args = nullptr;
@ -269,10 +266,13 @@ static void test_max_message_length_on_request(grpc_end2end_test_config config,
done: done:
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(grpc_slice_slice(details, expect_in_details) >= 0); GPR_ASSERT(
grpc_slice_str_cmp(
details, send_limit
? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)") == 0);
grpc_slice_unref(details); grpc_slice_unref(details);
grpc_slice_unref(expect_in_details);
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv); grpc_metadata_array_destroy(&request_metadata_recv);
@ -316,9 +316,6 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config,
grpc_status_code status; grpc_status_code status;
grpc_call_error error; grpc_call_error error;
grpc_slice details; grpc_slice details;
grpc_slice expect_in_details = grpc_slice_from_copied_string(
send_limit ? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)");
int was_cancelled = 2; int was_cancelled = 2;
grpc_channel_args* client_args = nullptr; grpc_channel_args* client_args = nullptr;
@ -458,10 +455,13 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config,
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(grpc_slice_slice(details, expect_in_details) >= 0); GPR_ASSERT(
grpc_slice_str_cmp(
details, send_limit
? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)") == 0);
grpc_slice_unref(details); grpc_slice_unref(details);
grpc_slice_unref(expect_in_details);
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv); grpc_metadata_array_destroy(&request_metadata_recv);

@ -40,14 +40,10 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name, const char* test_name,
grpc_channel_args* client_args, grpc_channel_args* client_args,
grpc_channel_args* server_args, grpc_channel_args* server_args,
bool request_status_early, bool request_status_early) {
bool recv_message_separately) {
grpc_end2end_test_fixture f; grpc_end2end_test_fixture f;
gpr_log( gpr_log(GPR_INFO, "Running test: %s/%s/request_status_early=%s", test_name,
GPR_INFO, config.name, request_status_early ? "true" : "false");
"Running test: %s/%s/request_status_early=%s/recv_message_separately=%s",
test_name, config.name, request_status_early ? "true" : "false",
recv_message_separately ? "true" : "false");
f = config.create_fixture(client_args, server_args); f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args); config.init_server(&f, server_args);
config.init_client(&f, client_args); config.init_client(&f, client_args);
@ -112,7 +108,7 @@ static void test(grpc_end2end_test_config config, bool request_status_early,
grpc_raw_byte_buffer_create(&response_payload2_slice, 1); grpc_raw_byte_buffer_create(&response_payload2_slice, 1);
grpc_end2end_test_fixture f = grpc_end2end_test_fixture f =
begin_test(config, "streaming_error_response", nullptr, nullptr, begin_test(config, "streaming_error_response", nullptr, nullptr,
request_status_early, recv_message_separately); request_status_early);
grpc_core::CqVerifier cqv(f.cq); grpc_core::CqVerifier cqv(f.cq);
grpc_op ops[6]; grpc_op ops[6];
grpc_op* op; grpc_op* op;

@ -155,8 +155,7 @@ TEST_F(ClientAuthFilterTest, CallCredsFails) {
auto promise = filter->MakeCallPromise( auto promise = filter->MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch_, CallArgs{ClientMetadataHandle(&initial_metadata_batch_,
Arena::PooledDeleter(nullptr)), Arena::PooledDeleter(nullptr)),
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr, nullptr, nullptr, nullptr},
nullptr},
[&](CallArgs /*call_args*/) { [&](CallArgs /*call_args*/) {
return ArenaPromise<ServerMetadataHandle>( return ArenaPromise<ServerMetadataHandle>(
[&]() -> Poll<ServerMetadataHandle> { [&]() -> Poll<ServerMetadataHandle> {
@ -186,8 +185,7 @@ TEST_F(ClientAuthFilterTest, RewritesInvalidStatusFromCallCreds) {
auto promise = filter->MakeCallPromise( auto promise = filter->MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch_, CallArgs{ClientMetadataHandle(&initial_metadata_batch_,
Arena::PooledDeleter(nullptr)), Arena::PooledDeleter(nullptr)),
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr, nullptr, nullptr, nullptr},
nullptr},
[&](CallArgs /*call_args*/) { [&](CallArgs /*call_args*/) {
return ArenaPromise<ServerMetadataHandle>( return ArenaPromise<ServerMetadataHandle>(
[&]() -> Poll<ServerMetadataHandle> { [&]() -> Poll<ServerMetadataHandle> {

@ -72,8 +72,7 @@ TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) {
auto promise = filter.MakeCallPromise( auto promise = filter.MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch, CallArgs{ClientMetadataHandle(&initial_metadata_batch,
Arena::PooledDeleter(nullptr)), Arena::PooledDeleter(nullptr)),
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr, nullptr, nullptr, nullptr},
nullptr},
[&](CallArgs call_args) { [&](CallArgs call_args) {
EXPECT_EQ(call_args.client_initial_metadata EXPECT_EQ(call_args.client_initial_metadata
->get_pointer(HttpAuthorityMetadata()) ->get_pointer(HttpAuthorityMetadata())
@ -108,8 +107,7 @@ TEST(ClientAuthorityFilterTest,
auto promise = filter.MakeCallPromise( auto promise = filter.MakeCallPromise(
CallArgs{ClientMetadataHandle(&initial_metadata_batch, CallArgs{ClientMetadataHandle(&initial_metadata_batch,
Arena::PooledDeleter(nullptr)), Arena::PooledDeleter(nullptr)),
ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr, nullptr, nullptr, nullptr},
nullptr},
[&](CallArgs call_args) { [&](CallArgs call_args) {
EXPECT_EQ(call_args.client_initial_metadata EXPECT_EQ(call_args.client_initial_metadata
->get_pointer(HttpAuthorityMetadata()) ->get_pointer(HttpAuthorityMetadata())

@ -477,7 +477,6 @@ class MainLoop {
auto* server_initial_metadata = arena_->New<Pipe<ServerMetadataHandle>>(); auto* server_initial_metadata = arena_->New<Pipe<ServerMetadataHandle>>();
CallArgs call_args{std::move(*LoadMetadata(client_initial_metadata, CallArgs call_args{std::move(*LoadMetadata(client_initial_metadata,
&client_initial_metadata_)), &client_initial_metadata_)),
ClientInitialMetadataOutstandingToken::Empty(),
&server_initial_metadata->sender, nullptr, nullptr}; &server_initial_metadata->sender, nullptr, nullptr};
if (is_client) { if (is_client) {
promise_ = main_loop_->channel_stack_->MakeClientCallPromise( promise_ = main_loop_->channel_stack_->MakeClientCallPromise(

Loading…
Cancel
Save