From 9eadf42dbfece3b0682a561f67c76f1722275b8b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 4 Dec 2023 22:10:48 -0800 Subject: [PATCH 1/4] [promises] Add an API to access new style filters (#35200) Will be used during this transition time to run 5-pipe style filters somewhat more natively. Once everything is getting closer to 5-pipes, we'll drop this method and have the channel stack understand how to create an interception-map that can be reused per-call, instead of creating the interception-map every time a call is created. Closes #35200 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35200 from ctiller:cg-channel-filter-api 2fc11dd273f0bafcfffbd713284f042db947f79a PiperOrigin-RevId: 587940947 --- .../filters/client_channel/client_channel.cc | 3 + .../filters/client_channel/retry_filter.cc | 1 + .../ext/filters/deadline/deadline_filter.cc | 2 + src/core/lib/channel/channel_stack.h | 6 + src/core/lib/channel/connected_channel.cc | 1 + src/core/lib/channel/promise_based_filter.h | 203 +++++++++++++++++- src/core/lib/surface/call_trace.cc | 3 +- src/core/lib/surface/server.cc | 1 + src/core/lib/transport/transport.h | 70 ++++++ .../channel/channel_stack_builder_test.cc | 4 +- test/core/channel/channel_stack_test.cc | 1 + .../core/end2end/tests/filter_causes_close.cc | 1 + test/core/end2end/tests/filter_context.cc | 2 +- test/core/end2end/tests/filter_init_fails.cc | 2 +- ...retry_cancel_with_multiple_send_batches.cc | 1 + .../tests/retry_recv_message_replay.cc | 1 + .../retry_recv_trailing_metadata_error.cc | 1 + .../core/end2end/tests/retry_send_op_fails.cc | 1 + .../end2end/tests/retry_transparent_goaway.cc | 1 + .../retry_transparent_not_sent_on_wire.cc | 1 + test/core/surface/channel_init_test.cc | 6 +- .../transport/chttp2/streams_not_seen_test.cc | 1 + .../xds/xds_channel_stack_modifier_test.cc | 8 +- 23 files changed, 303 insertions(+), 18 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 197173aa7e0..91b4712c61a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -407,6 +407,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { const grpc_channel_filter ClientChannel::kFilterVtableWithPromises = { ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch, ClientChannel::MakeCallPromise, + /* init_call: */ nullptr, ClientChannel::StartTransportOp, sizeof(ClientChannel::FilterBasedCallData), ClientChannel::FilterBasedCallData::Init, @@ -423,6 +424,7 @@ const grpc_channel_filter ClientChannel::kFilterVtableWithPromises = { const grpc_channel_filter ClientChannel::kFilterVtableWithoutPromises = { ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch, nullptr, + /* init_call: */ nullptr, ClientChannel::StartTransportOp, sizeof(ClientChannel::FilterBasedCallData), ClientChannel::FilterBasedCallData::Init, @@ -570,6 +572,7 @@ class DynamicTerminationFilter::CallData { const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = { DynamicTerminationFilter::CallData::StartTransportStreamOpBatch, DynamicTerminationFilter::MakeCallPromise, + /* init_call: */ nullptr, DynamicTerminationFilter::StartTransportOp, sizeof(DynamicTerminationFilter::CallData), DynamicTerminationFilter::CallData::Init, diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index cb29fbee2a4..f2d393f9484 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -143,6 +143,7 @@ const RetryMethodConfig* RetryFilter::GetRetryPolicy( const grpc_channel_filter RetryFilter::kVtable = { RetryFilter::LegacyCallData::StartTransportStreamOpBatch, nullptr, + /* init_call: */ nullptr, RetryFilter::StartTransportOp, sizeof(RetryFilter::LegacyCallData), RetryFilter::LegacyCallData::Init, diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 94a4f65225e..50ce988173e 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -343,6 +343,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { grpc_core::NextPromiseFactory next_promise_factory) { return next_promise_factory(std::move(call_args)); }, + /* init_call: */ nullptr, grpc_channel_next_op, sizeof(grpc_deadline_state), deadline_init_call_elem, @@ -368,6 +369,7 @@ const grpc_channel_filter grpc_server_deadline_filter = { } return next_promise_factory(std::move(call_args)); }, + /* init_call: */ nullptr, grpc_channel_next_op, sizeof(server_call_data), deadline_init_call_elem, diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 918628856b0..5b6649a9eac 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -128,6 +128,12 @@ struct grpc_channel_filter { grpc_core::ArenaPromise (*make_call_promise)( grpc_channel_element* elem, grpc_core::CallArgs call_args, grpc_core::NextPromiseFactory next_promise_factory); + // Register interceptors into a call. + // If this is non-null it may be used in preference to make_call_promise. + // There is an on-going migration to move all filters to providing this, and + // then to drop start_transport_stream_op_batch. + void (*init_call)(grpc_channel_element* elem, + grpc_core::CallSpineInterface* call_spine); // Called to handle channel level operations - e.g. new calls, or transport // closure. // See grpc_channel_next_op on how to call the next element in the stack diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index afca6eda5fc..0ceba07e075 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -857,6 +857,7 @@ grpc_channel_filter MakeConnectedFilter() { return { connected_channel_start_transport_stream_op_batch, make_call_promise != nullptr ? make_call_wrapper : nullptr, + /* init_call: */ nullptr, connected_channel_start_transport_op, sizeof(call_data), connected_channel_init_call_elem, diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index a94b79bbd0c..fa015fc6d3d 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -345,15 +345,77 @@ inline auto RunCall(void (Derived::Call::*fn)(ClientMetadata& md, } inline void InterceptClientToServerMessage(const NoInterceptor*, void*, - CallArgs&) {} + const CallArgs&) {} +inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*, + CallSpineInterface*) {} + +inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*, + CallSpineInterface*) {} + +template +inline void InterceptClientInitialMetadata( + void (Derived::Call::*fn)(ClientMetadata& md), typename Derived::Call* call, + Derived*, CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); + call_spine->client_initial_metadata().receiver.InterceptAndMap( + [call](ClientMetadataHandle md) { + call->OnClientInitialMetadata(*md); + return md; + }); +} + +template +inline void InterceptClientInitialMetadata( + void (Derived::Call::*fn)(ClientMetadata& md, Derived* channel), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); + call_spine->client_initial_metadata().receiver.InterceptAndMap( + [call, channel](ClientMetadataHandle md) { + call->OnClientInitialMetadata(*md, channel); + return md; + }); +} + +template +inline void InterceptClientInitialMetadata( + ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md), + typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); + call_spine->client_initial_metadata().receiver.InterceptAndMap( + [call_spine, + call](ClientMetadataHandle md) -> absl::optional { + auto return_md = call->OnClientInitialMetadata(*md); + if (return_md == nullptr) return std::move(md); + return call_spine->Cancel(std::move(return_md)); + }); +} + +template +inline void InterceptClientInitialMetadata( + ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md, + Derived* channel), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); + call_spine->client_initial_metadata().receiver.InterceptAndMap( + [call_spine, call, channel]( + ClientMetadataHandle md) -> absl::optional { + auto return_md = call->OnClientInitialMetadata(*md, channel); + if (return_md == nullptr) return std::move(md); + return call_spine->Cancel(std::move(return_md)); + }); +} + +template inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, - CallArgs&) {} + const CallArgs&) {} template inline void InterceptServerInitialMetadata( void (Derived::Call::*fn)(ServerMetadata&), - FilterCallData* call_data, CallArgs& call_args) { + FilterCallData* call_data, const CallArgs& call_args) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); call_args.server_initial_metadata->InterceptAndMap( [call_data](ServerMetadataHandle md) { @@ -365,7 +427,7 @@ inline void InterceptServerInitialMetadata( template inline void InterceptServerInitialMetadata( absl::Status (Derived::Call::*fn)(ServerMetadata&), - FilterCallData* call_data, CallArgs& call_args) { + FilterCallData* call_data, const CallArgs& call_args) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); call_args.server_initial_metadata->InterceptAndMap( [call_data]( @@ -379,8 +441,69 @@ inline void InterceptServerInitialMetadata( }); } +template +inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*, + CallSpineInterface*) {} + +template +inline void InterceptServerInitialMetadata( + void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call, + Derived*, CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); + call_spine->server_initial_metadata().sender.InterceptAndMap( + [call](ServerMetadataHandle md) { + call->OnServerInitialMetadata(*md); + return md; + }); +} + +template +inline void InterceptServerInitialMetadata( + absl::Status (Derived::Call::*fn)(ServerMetadata&), + typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); + call_spine->server_initial_metadata().sender.InterceptAndMap( + [call, call_spine]( + ServerMetadataHandle md) -> absl::optional { + auto status = call->OnServerInitialMetadata(*md); + if (status.ok()) return std::move(md); + return call_spine->Cancel(ServerMetadataFromStatus(status)); + }); +} + inline void InterceptServerToClientMessage(const NoInterceptor*, void*, - CallArgs&) {} + const CallArgs&) {} + +inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*, + CallSpineInterface*) {} + +inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*, + CallSpineInterface*) {} + +template +inline void InterceptServerTrailingMetadata( + void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call, + Derived*, CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata); + call_spine->server_trailing_metadata().sender.InterceptAndMap( + [call](ServerMetadataHandle md) { + call->OnServerTrailingMetadata(*md); + return md; + }); +} + +template +inline void InterceptServerTrailingMetadata( + absl::Status (Derived::Call::*fn)(ServerMetadata&), + typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata); + call_spine->server_trailing_metadata().sender.InterceptAndMap( + [call](ServerMetadataHandle md) -> absl::optional { + auto status = call->OnServerTrailingMetadata(*md); + if (status.ok()) return std::move(md); + return ServerMetadataFromStatus(status); + }); +} template absl::enable_if_t>::value, @@ -449,6 +572,29 @@ MakeFilterCall(Derived* derived) { template class ImplementChannelFilter : public ChannelFilter { public: + // Natively construct a v3 call. + void InitCall(CallSpineInterface* call_spine) { + auto* call = GetContext()->ManagedNew(); + promise_filter_detail::InterceptClientInitialMetadata( + &Derived::Call::OnClientInitialMetadata, call, + static_cast(this), call_spine); + promise_filter_detail::InterceptClientToServerMessage( + &Derived::Call::OnClientToServerMessage, call, + static_cast(this), call_spine); + promise_filter_detail::InterceptServerInitialMetadata( + &Derived::Call::OnServerInitialMetadata, call, + static_cast(this), call_spine); + promise_filter_detail::InterceptServerToClientMessage( + &Derived::Call::OnServerToClientMessage, call, + static_cast(this), call_spine); + promise_filter_detail::InterceptServerTrailingMetadata( + &Derived::Call::OnServerTrailingMetadata, call, + static_cast(this), call_spine); + } + + // Polyfill for the original promise scheme. + // Allows writing v3 filters that work with v2 stacks. + // (and consequently also v1 stacks since we can polyfill back to that too). ArenaPromise MakeCallPromise( CallArgs call_args, NextPromiseFactory next_promise_factory) final { auto* call = promise_filter_detail::MakeFilterCall( @@ -1262,7 +1408,49 @@ struct ChannelFilterWithFlagsMethods { // ChannelArgs channel_args, ChannelFilter::Args filter_args); // }; template -absl::enable_if_t::value, grpc_channel_filter> +absl::enable_if_t::value && + !std::is_base_of, F>::value, + grpc_channel_filter> +MakePromiseBasedFilter(const char* name) { + using CallData = promise_filter_detail::CallData; + + return grpc_channel_filter{ + // start_transport_stream_op_batch + promise_filter_detail::BaseCallDataMethods::StartTransportStreamOpBatch, + // make_call_promise + promise_filter_detail::ChannelFilterMethods::MakeCallPromise, + nullptr, + // start_transport_op + promise_filter_detail::ChannelFilterMethods::StartTransportOp, + // sizeof_call_data + sizeof(CallData), + // init_call_elem + promise_filter_detail::CallDataFilterWithFlagsMethods< + CallData, kFlags>::InitCallElem, + // set_pollset_or_pollset_set + promise_filter_detail::BaseCallDataMethods::SetPollsetOrPollsetSet, + // destroy_call_elem + promise_filter_detail::CallDataFilterWithFlagsMethods< + CallData, kFlags>::DestroyCallElem, + // sizeof_channel_data + sizeof(F), + // init_channel_elem + promise_filter_detail::ChannelFilterWithFlagsMethods< + F, kFlags>::InitChannelElem, + // post_init_channel_elem + promise_filter_detail::ChannelFilterMethods::PostInitChannelElem, + // destroy_channel_elem + promise_filter_detail::ChannelFilterMethods::DestroyChannelElem, + // get_channel_info + promise_filter_detail::ChannelFilterMethods::GetChannelInfo, + // name + name, + }; +} + +template +absl::enable_if_t, F>::value, + grpc_channel_filter> MakePromiseBasedFilter(const char* name) { using CallData = promise_filter_detail::CallData; @@ -1271,6 +1459,9 @@ MakePromiseBasedFilter(const char* name) { promise_filter_detail::BaseCallDataMethods::StartTransportStreamOpBatch, // make_call_promise promise_filter_detail::ChannelFilterMethods::MakeCallPromise, + [](grpc_channel_element* elem, CallSpineInterface* args) { + static_cast(elem->channel_data)->InitCall(args); + }, // start_transport_op promise_filter_detail::ChannelFilterMethods::StartTransportOp, // sizeof_call_data diff --git a/src/core/lib/surface/call_trace.cc b/src/core/lib/surface/call_trace.cc index 163418f5083..d9b544ee1f9 100644 --- a/src/core/lib/surface/call_trace.cc +++ b/src/core/lib/surface/call_trace.cc @@ -77,7 +77,8 @@ const grpc_channel_filter* PromiseTracingFilterFor( return r; }; }, - grpc_channel_next_op, /* sizeof_call_data: */ 0, + /* init_call: */ nullptr, grpc_channel_next_op, + /* sizeof_call_data: */ 0, // init_call_elem: [](grpc_call_element*, const grpc_call_element_args*) { return absl::OkStatus(); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 2402f45e9da..a56f58d98e4 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -757,6 +757,7 @@ class ChannelBroadcaster { const grpc_channel_filter Server::kServerTopFilter = { Server::CallData::StartTransportStreamOpBatch, Server::ChannelData::MakeCallPromise, + /* init_call: */ nullptr, grpc_channel_next_op, sizeof(Server::CallData), Server::CallData::InitCallElement, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 6f451a9ae4b..fea659ff7e9 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -55,6 +55,7 @@ #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/status.h" #include "src/core/lib/promise/latch.h" +#include "src/core/lib/promise/party.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" @@ -228,6 +229,75 @@ struct CallArgs { using NextPromiseFactory = std::function(CallArgs)>; +// The common middle part of a call - a reference is held by each of +// CallInitiator and CallHandler - which provide interfaces that are appropriate +// for each side of a call. +// The spine will ultimately host the pipes, filters, and context for one part +// of a call: ie top-half client channel, sub channel call, server call. +// TODO(ctiller): eventually drop this when we don't need to reference into +// legacy promise calls anymore +class CallSpineInterface { + public: + virtual ~CallSpineInterface() = default; + virtual Pipe& client_initial_metadata() = 0; + virtual Pipe& server_initial_metadata() = 0; + virtual Pipe& client_to_server_messages() = 0; + virtual Pipe& server_to_client_messages() = 0; + virtual Pipe& server_trailing_metadata() = 0; + // Cancel the call with the given metadata. + // Regarding the `MUST_USE_RESULT absl::nullopt_t`: + // Most cancellation calls right now happen in pipe interceptors; + // there `nullopt` indicates terminate processing of this pipe and close with + // error. + // It's convenient then to have the Cancel operation (setting the latch to + // terminate the call) be the last thing that occurs in a pipe interceptor, + // and this construction supports that (and has helped the author not write + // some bugs). + GRPC_MUST_USE_RESULT virtual absl::nullopt_t Cancel( + ServerMetadataHandle metadata) = 0; + virtual Party& party() = 0; +}; + +class CallSpine final : public CallSpineInterface { + public: + Pipe& client_initial_metadata() override { + return client_initial_metadata_; + } + Pipe& server_initial_metadata() override { + return server_initial_metadata_; + } + Pipe& client_to_server_messages() override { + return client_to_server_messages_; + } + Pipe& server_to_client_messages() override { + return server_to_client_messages_; + } + Pipe& server_trailing_metadata() override { + return server_trailing_metadata_; + } + absl::nullopt_t Cancel(ServerMetadataHandle metadata) override { + GPR_DEBUG_ASSERT(Activity::current() == &party()); + if (cancel_latch_.is_set()) return absl::nullopt; + cancel_latch_.Set(std::move(metadata)); + return absl::nullopt; + } + Party& party() override { Crash("unimplemented"); } + + private: + // Initial metadata from client to server + Pipe client_initial_metadata_; + // Initial metadata from server to client + Pipe server_initial_metadata_; + // Messages travelling from the application to the transport. + Pipe client_to_server_messages_; + // Messages travelling from the transport to the application. + Pipe server_to_client_messages_; + // Trailing metadata from server to client + Pipe server_trailing_metadata_; + // Latch that can be set to terminate the call + Latch cancel_latch_; +}; + } // namespace grpc_core // forward declarations diff --git a/test/core/channel/channel_stack_builder_test.cc b/test/core/channel/channel_stack_builder_test.cc index 4e253b832d0..65d4466eff9 100644 --- a/test/core/channel/channel_stack_builder_test.cc +++ b/test/core/channel/channel_stack_builder_test.cc @@ -62,8 +62,8 @@ const grpc_channel_filter* FilterNamed(const char* name) { ->emplace( name, new grpc_channel_filter{ - grpc_call_next_op, nullptr, grpc_channel_next_op, 0, CallInitFunc, - grpc_call_stack_ignore_set_pollset_or_pollset_set, + grpc_call_next_op, nullptr, nullptr, grpc_channel_next_op, 0, + CallInitFunc, grpc_call_stack_ignore_set_pollset_or_pollset_set, CallDestroyFunc, 0, ChannelInitFunc, [](grpc_channel_stack*, grpc_channel_element*) {}, ChannelDestroyFunc, grpc_channel_next_get_info, name}) diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index 7ed34f60b64..4054e6a8844 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -83,6 +83,7 @@ TEST(ChannelStackTest, CreateChannelStack) { const grpc_channel_filter filter = { call_func, nullptr, + nullptr, channel_func, sizeof(int), call_init_func, diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc index 751507495de..91b917ced3b 100644 --- a/test/core/end2end/tests/filter_causes_close.cc +++ b/test/core/end2end/tests/filter_causes_close.cc @@ -102,6 +102,7 @@ const grpc_channel_filter test_filter = { return Immediate(ServerMetadataFromStatus( absl::PermissionDeniedError("Failure that's not preventable."))); }, + nullptr, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/test/core/end2end/tests/filter_context.cc b/test/core/end2end/tests/filter_context.cc index 8156384fbd6..1da07e814e9 100644 --- a/test/core/end2end/tests/filter_context.cc +++ b/test/core/end2end/tests/filter_context.cc @@ -82,7 +82,7 @@ grpc_error_handle init_channel_elem(grpc_channel_element* /*elem*/, void destroy_channel_elem(grpc_channel_element* /*elem*/) {} const grpc_channel_filter test_filter = { - start_transport_stream_op_batch, nullptr, grpc_channel_next_op, + start_transport_stream_op_batch, nullptr, nullptr, grpc_channel_next_op, sizeof(call_data), init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0, init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem, diff --git a/test/core/end2end/tests/filter_init_fails.cc b/test/core/end2end/tests/filter_init_fails.cc index a640bebc5f0..a160b38b60a 100644 --- a/test/core/end2end/tests/filter_init_fails.cc +++ b/test/core/end2end/tests/filter_init_fails.cc @@ -80,7 +80,7 @@ const grpc_channel_filter test_filter = { return Immediate(ServerMetadataFromStatus( absl::PermissionDeniedError("access denied"))); }, - grpc_channel_next_op, 0, init_call_elem, + nullptr, grpc_channel_next_op, 0, init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0, init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem, grpc_channel_next_get_info, diff --git a/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc b/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc index 85948b208c5..22de5660e05 100644 --- a/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc +++ b/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc @@ -159,6 +159,7 @@ class FailSendOpsFilter { grpc_channel_filter FailSendOpsFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_recv_message_replay.cc b/test/core/end2end/tests/retry_recv_message_replay.cc index 74e5f98bfc7..befb2391dde 100644 --- a/test/core/end2end/tests/retry_recv_message_replay.cc +++ b/test/core/end2end/tests/retry_recv_message_replay.cc @@ -109,6 +109,7 @@ class FailFirstSendOpFilter { grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc b/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc index 8a277ff4b6d..7a43114ec3a 100644 --- a/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc +++ b/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc @@ -104,6 +104,7 @@ class InjectStatusFilter { grpc_channel_filter InjectStatusFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_send_op_fails.cc b/test/core/end2end/tests/retry_send_op_fails.cc index 3bb2715dc5a..2fdc3818694 100644 --- a/test/core/end2end/tests/retry_send_op_fails.cc +++ b/test/core/end2end/tests/retry_send_op_fails.cc @@ -110,6 +110,7 @@ class FailFirstCallFilter { grpc_channel_filter FailFirstCallFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_transparent_goaway.cc b/test/core/end2end/tests/retry_transparent_goaway.cc index 98d3f744d18..091a800f210 100644 --- a/test/core/end2end/tests/retry_transparent_goaway.cc +++ b/test/core/end2end/tests/retry_transparent_goaway.cc @@ -116,6 +116,7 @@ class FailFirstCallFilter { grpc_channel_filter FailFirstCallFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc b/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc index 87542064ad4..05bf6c1ac12 100644 --- a/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc +++ b/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc @@ -115,6 +115,7 @@ class FailFirstTenCallsFilter { grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/surface/channel_init_test.cc b/test/core/surface/channel_init_test.cc index 22fdad11da9..bab56a38d3a 100644 --- a/test/core/surface/channel_init_test.cc +++ b/test/core/surface/channel_init_test.cc @@ -35,9 +35,9 @@ const grpc_channel_filter* FilterNamed(const char* name) { if (it != filters->end()) return it->second; return filters ->emplace(name, - new grpc_channel_filter{nullptr, nullptr, nullptr, 0, nullptr, - nullptr, nullptr, 0, nullptr, nullptr, - nullptr, nullptr, name}) + new grpc_channel_filter{nullptr, nullptr, nullptr, nullptr, 0, + nullptr, nullptr, nullptr, 0, nullptr, + nullptr, nullptr, nullptr, name}) .first->second; } diff --git a/test/core/transport/chttp2/streams_not_seen_test.cc b/test/core/transport/chttp2/streams_not_seen_test.cc index e38160d4b41..420686ea6e6 100644 --- a/test/core/transport/chttp2/streams_not_seen_test.cc +++ b/test/core/transport/chttp2/streams_not_seen_test.cc @@ -199,6 +199,7 @@ class TrailingMetadataRecordingFilter { grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/xds/xds_channel_stack_modifier_test.cc b/test/core/xds/xds_channel_stack_modifier_test.cc index ab6c78242f0..ffd0b9f450d 100644 --- a/test/core/xds/xds_channel_stack_modifier_test.cc +++ b/test/core/xds/xds_channel_stack_modifier_test.cc @@ -95,11 +95,11 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) { grpc_init(); // Add 2 test filters to XdsChannelStackModifier const grpc_channel_filter test_filter_1 = { - nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, nullptr, kTestFilter1}; + nullptr, nullptr, nullptr, nullptr, 0, nullptr, nullptr, + nullptr, 0, nullptr, nullptr, nullptr, nullptr, kTestFilter1}; const grpc_channel_filter test_filter_2 = { - nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, nullptr, kTestFilter2}; + nullptr, nullptr, nullptr, nullptr, 0, nullptr, nullptr, + nullptr, 0, nullptr, nullptr, nullptr, nullptr, kTestFilter2}; auto channel_stack_modifier = MakeRefCounted( std::vector{&test_filter_1, &test_filter_2}); grpc_arg arg = channel_stack_modifier->MakeChannelArg(); From 2fa8018d2a0e0d97f3b4015f5a792f24912a7d79 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 5 Dec 2023 11:11:55 -0800 Subject: [PATCH 2/4] [call-v3] Introduce some cancellation helpers to CallSpine (#35212) These will be used in upcoming changes to allow easy spawning of promises into parties that properly cancel out things when those promises fail. Closes #35212 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35212 from ctiller:cg-cancel-nicely 32fc7aa09a6de4b5dd3933ec09791f2bfdfa6a74 PiperOrigin-RevId: 588130225 --- src/core/lib/transport/transport.h | 42 ++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index fea659ff7e9..a89b2084ba2 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -256,6 +256,48 @@ class CallSpineInterface { GRPC_MUST_USE_RESULT virtual absl::nullopt_t Cancel( ServerMetadataHandle metadata) = 0; virtual Party& party() = 0; + + // Wrap a promise so that if it returns failure it automatically cancels + // the rest of the call. + // The resulting (returned) promise will resolve to Empty. + template + auto CancelIfFails(Promise promise) { + GPR_DEBUG_ASSERT(Activity::current() == &party()); + using P = promise_detail::PromiseLike; + using ResultType = typename P::Result; + return Map(std::move(promise), [this](ResultType r) { + if (!IsStatusOk(r)) { + std::ignore = Cancel(StatusCast(std::move(r))); + } + return Empty{}; + }); + } + + // Spawn a promise that returns Empty{} and save some boilerplate handling + // that detail. + template + void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { + party().Spawn(name, std::move(promise_factory), [](Empty) {}); + } + + // Spawn a promise that returns some status-like type; if the status + // represents failure automatically cancel the rest of the call. + template + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { + using FactoryType = + promise_detail::OncePromiseFactory; + using PromiseType = typename FactoryType::Promise; + using ResultType = typename PromiseType::Result; + static_assert( + std::is_same()))>::value, + "SpawnGuarded promise must return a status-like object"); + party().Spawn(name, std::move(promise_factory), [this](ResultType r) { + if (!IsStatusOk(r)) { + std::ignore = Cancel(StatusCast(std::move(r))); + } + }); + } }; class CallSpine final : public CallSpineInterface { From 1a086609d7083b2e6413b397fc3239adea3d66f2 Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Tue, 5 Dec 2023 12:00:25 -0800 Subject: [PATCH 3/4] [server] Fix typo in the class name (#35216) Closes #35216 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35216 from eugeneo:tasks/typo-aync d9f58bc130f4e0ce337b16826caff7daac0a55e2 PiperOrigin-RevId: 588146516 --- src/cpp/server/secure_server_credentials.cc | 20 ++++++++++---------- src/cpp/server/secure_server_credentials.h | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index 9ec5d24031f..fd198898d2f 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -36,15 +36,15 @@ namespace grpc { -void AuthMetadataProcessorAyncWrapper::Destroy(void* wrapper) { - auto* w = static_cast(wrapper); +void AuthMetadataProcessorAsyncWrapper::Destroy(void* wrapper) { + auto* w = static_cast(wrapper); delete w; } -void AuthMetadataProcessorAyncWrapper::Process( +void AuthMetadataProcessorAsyncWrapper::Process( void* wrapper, grpc_auth_context* context, const grpc_metadata* md, size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) { - auto* w = static_cast(wrapper); + auto* w = static_cast(wrapper); if (!w->processor_) { // Early exit. cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr); @@ -52,8 +52,8 @@ void AuthMetadataProcessorAyncWrapper::Process( } if (w->processor_->IsBlocking()) { w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { - w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, - cb, user_data); + w->AuthMetadataProcessorAsyncWrapper::InvokeProcessor(context, md, num_md, + cb, user_data); }); } else { // invoke directly. @@ -61,7 +61,7 @@ void AuthMetadataProcessorAyncWrapper::Process( } } -void AuthMetadataProcessorAyncWrapper::InvokeProcessor( +void AuthMetadataProcessorAsyncWrapper::InvokeProcessor( grpc_auth_context* context, const grpc_metadata* md, size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) { AuthMetadataProcessor::InputMetadata metadata; @@ -104,10 +104,10 @@ int SecureServerCredentials::AddPortToServer(const std::string& addr, void SecureServerCredentials::SetAuthMetadataProcessor( const std::shared_ptr& processor) { - auto* wrapper = new grpc::AuthMetadataProcessorAyncWrapper(processor); + auto* wrapper = new grpc::AuthMetadataProcessorAsyncWrapper(processor); grpc_server_credentials_set_auth_metadata_processor( - creds_, {grpc::AuthMetadataProcessorAyncWrapper::Process, - grpc::AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); + creds_, {grpc::AuthMetadataProcessorAsyncWrapper::Process, + grpc::AuthMetadataProcessorAsyncWrapper::Destroy, wrapper}); } std::shared_ptr SslServerCredentials( diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h index ca90a25f4f2..a62e51f2c24 100644 --- a/src/cpp/server/secure_server_credentials.h +++ b/src/cpp/server/secure_server_credentials.h @@ -35,7 +35,7 @@ namespace grpc { class SecureServerCredentials; -class AuthMetadataProcessorAyncWrapper final { +class AuthMetadataProcessorAsyncWrapper final { public: static void Destroy(void* wrapper); @@ -43,7 +43,7 @@ class AuthMetadataProcessorAyncWrapper final { const grpc_metadata* md, size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data); - explicit AuthMetadataProcessorAyncWrapper( + explicit AuthMetadataProcessorAsyncWrapper( const std::shared_ptr& processor) : processor_(processor) { if (processor && processor->IsBlocking()) { @@ -78,7 +78,7 @@ class SecureServerCredentials final : public ServerCredentials { SecureServerCredentials* AsSecureServerCredentials() override { return this; } grpc_server_credentials* creds_; - std::unique_ptr processor_; + std::unique_ptr processor_; }; } // namespace grpc From e8d9f222f551427d4c65cc37002de70f16982664 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 5 Dec 2023 15:05:31 -0800 Subject: [PATCH 4/4] [server] Make SetRegisteredMethodOnMetadata a method, not a static member fn (#35221) As part of the call-v3 work I'll be making a call to this code via a different path shortly, and separating the C-style callback piece out helps that work Closes #35221 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35221 from ctiller:cg-registered-method 4b6d80ee7f202cb083e2a3cb9811ce2d6a2cd269 PiperOrigin-RevId: 588200784 --- src/core/lib/surface/server.cc | 22 ++++++++++++---------- src/core/lib/surface/server.h | 3 +-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index a56f58d98e4..037e2b59e0b 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1331,7 +1331,10 @@ void Server::ChannelData::InitTransport(RefCountedPtr server, op->set_accept_stream = true; op->set_accept_stream_fn = AcceptStream; if (IsRegisteredMethodLookupInTransportEnabled()) { - op->set_registered_method_matcher_fn = SetRegisteredMethodOnMetadata; + op->set_registered_method_matcher_fn = [](void* arg, + ClientMetadata* metadata) { + static_cast(arg)->SetRegisteredMethodOnMetadata(*metadata); + }; } // op->set_registered_method_matcher_fn = Registered op->set_accept_stream_user_data = this; @@ -1387,30 +1390,29 @@ Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( } void Server::ChannelData::SetRegisteredMethodOnMetadata( - void* arg, ServerMetadata* metadata) { - auto* chand = static_cast(arg); - auto* authority = metadata->get_pointer(HttpAuthorityMetadata()); + ClientMetadata& metadata) { + auto* authority = metadata.get_pointer(HttpAuthorityMetadata()); if (authority == nullptr) { - authority = metadata->get_pointer(HostMetadata()); + authority = metadata.get_pointer(HostMetadata()); if (authority == nullptr) { // Authority not being set is an RPC error. return; } } - auto* path = metadata->get_pointer(HttpPathMetadata()); + auto* path = metadata.get_pointer(HttpPathMetadata()); if (path == nullptr) { // Path not being set would result in an RPC error. return; } ChannelRegisteredMethod* method; if (!IsRegisteredMethodsMapEnabled()) { - method = chand->GetRegisteredMethod(authority->c_slice(), path->c_slice()); + method = GetRegisteredMethod(authority->c_slice(), path->c_slice()); } else { - method = chand->GetRegisteredMethod(authority->as_string_view(), - path->as_string_view()); + method = GetRegisteredMethod(authority->as_string_view(), + path->as_string_view()); } // insert in metadata - metadata->Set(GrpcRegisteredMethod(), method); + metadata.Set(GrpcRegisteredMethod(), method); } void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/, diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index f5999bcda30..4e181c8bb37 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -268,8 +268,7 @@ class Server : public InternallyRefCounted, static void AcceptStream(void* arg, Transport* /*transport*/, const void* transport_server_data); - static void SetRegisteredMethodOnMetadata(void* arg, - ServerMetadata* metadata); + void SetRegisteredMethodOnMetadata(ClientMetadata& metadata); void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);