diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index fd728b63d81..c80579719f1 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -233,6 +233,7 @@ class ClientChannel::CallData { const grpc_channel_filter ClientChannel::kFilterVtable = { ClientChannel::CallData::StartTransportStreamOpBatch, + nullptr, ClientChannel::StartTransportOp, sizeof(ClientChannel::CallData), ClientChannel::CallData::Init, @@ -383,6 +384,7 @@ class DynamicTerminationFilter::CallData { const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = { DynamicTerminationFilter::CallData::StartTransportStreamOpBatch, + nullptr, DynamicTerminationFilter::StartTransportOp, sizeof(DynamicTerminationFilter::CallData), DynamicTerminationFilter::CallData::Init, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index fe64441f037..3be5a7f2786 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -132,6 +132,7 @@ static void clr_start_transport_stream_op_batch( const grpc_channel_filter grpc_client_load_reporting_filter = { clr_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), clr_init_call_elem, diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index 2561e12c992..b777b6d9fae 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -2541,6 +2541,7 @@ void RetryFilter::CallData::OnRetryTimerLocked(void* arg, const grpc_channel_filter kRetryFilterVtable = { RetryFilter::CallData::StartTransportStreamOpBatch, + nullptr, RetryFilter::StartTransportOp, sizeof(RetryFilter::CallData), RetryFilter::CallData::Init, diff --git a/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc b/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc index d540ae2e87f..714896096fc 100644 --- a/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc +++ b/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc @@ -123,6 +123,7 @@ void ServiceConfigChannelArgDestroyChannelElem(grpc_channel_element* elem) { const grpc_channel_filter ServiceConfigChannelArgFilter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, sizeof(ServiceConfigChannelArgCallData), ServiceConfigChannelArgInitCallElem, diff --git a/src/core/ext/filters/client_idle/client_idle_filter.cc b/src/core/ext/filters/client_idle/client_idle_filter.cc index 6cd3d12ec70..df8fc07ab35 100644 --- a/src/core/ext/filters/client_idle/client_idle_filter.cc +++ b/src/core/ext/filters/client_idle/client_idle_filter.cc @@ -233,6 +233,7 @@ void CallData::Destroy(grpc_call_element* elem, const grpc_channel_filter grpc_client_idle_filter = { grpc_call_next_op, + nullptr, ChannelData::StartTransportOp, sizeof(CallData), CallData::Init, diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 9bdd51fc240..6cca2a98174 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -338,6 +338,7 @@ static void deadline_server_start_transport_stream_op_batch( const grpc_channel_filter grpc_client_deadline_filter = { deadline_client_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(base_call_data), deadline_init_call_elem, @@ -352,6 +353,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { const grpc_channel_filter grpc_server_deadline_filter = { deadline_server_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(server_call_data), deadline_init_call_elem, diff --git a/src/core/ext/filters/fault_injection/fault_injection_filter.cc b/src/core/ext/filters/fault_injection/fault_injection_filter.cc index 2f84c14bd0c..fce1a4fc9ef 100644 --- a/src/core/ext/filters/fault_injection/fault_injection_filter.cc +++ b/src/core/ext/filters/fault_injection/fault_injection_filter.cc @@ -471,6 +471,7 @@ void CallData::HijackedRecvTrailingMetadataReady(void* arg, extern const grpc_channel_filter FaultInjectionFilterVtable = { CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index fe057687e2f..2ad4e87d597 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -505,6 +505,7 @@ static void http_client_destroy_channel_elem(grpc_channel_element* elem) { const grpc_channel_filter grpc_http_client_filter = { http_client_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), http_client_init_call_elem, diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index 151c85db027..2d7572b8f4f 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -443,6 +443,7 @@ void CompressDestroyChannelElem(grpc_channel_element* elem) { const grpc_channel_filter grpc_message_compress_filter = { CompressStartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), CompressInitCallElem, diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc index 06391f362c4..36a54335a1e 100644 --- a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc @@ -365,6 +365,7 @@ void DecompressDestroyChannelElem(grpc_channel_element* elem) { const grpc_channel_filter MessageDecompressFilter = { DecompressStartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), DecompressInitCallElem, diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 8ccff256e80..f39547bfec4 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -423,6 +423,7 @@ static void hs_destroy_channel_elem(grpc_channel_element* /*elem*/) {} const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), hs_init_call_elem, diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index c06872847d9..16927e2ab1c 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -523,6 +523,7 @@ static void max_age_destroy_channel_elem(grpc_channel_element* elem) { const grpc_channel_filter grpc_max_age_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, 0, /* sizeof_call_data */ max_age_init_call_elem, diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 5fb92c45403..51fb768b55f 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -330,6 +330,7 @@ static void message_size_destroy_channel_elem(grpc_channel_element* elem) { const grpc_channel_filter grpc_message_size_filter = { message_size_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), message_size_init_call_elem, diff --git a/src/core/ext/filters/rbac/rbac_filter.cc b/src/core/ext/filters/rbac/rbac_filter.cc index 1435a049e20..b3a97dc2a16 100644 --- a/src/core/ext/filters/rbac/rbac_filter.cc +++ b/src/core/ext/filters/rbac/rbac_filter.cc @@ -109,6 +109,7 @@ void RbacFilter::CallData::RecvInitialMetadataReady(void* user_data, const grpc_channel_filter RbacFilter::kFilterVtable = { RbacFilter::CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(RbacFilter::CallData), RbacFilter::CallData::Init, diff --git a/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc b/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc index 8954391cc4d..9aad912dbb8 100644 --- a/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc +++ b/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc @@ -255,6 +255,7 @@ void CallData::MaybeResumeRecvTrailingMetadataReady() { const grpc_channel_filter kServerConfigSelectorFilter = { CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc index 3206bec8e7f..c92a4f5bf7f 100644 --- a/src/core/ext/transport/binder/transport/binder_transport.cc +++ b/src/core/ext/transport/binder/transport/binder_transport.cc @@ -689,6 +689,7 @@ static grpc_endpoint* get_endpoint(grpc_transport*) { static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream), "binder", init_stream, + nullptr, set_pollset, set_pollset_set, perform_stream_op, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index df40ad3b7c6..efc0809c395 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -3161,6 +3161,7 @@ static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) { static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), "chttp2", init_stream, + nullptr, set_pollset, set_pollset_set, perform_stream_op, diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index f25d8e0f5a7..64fe89bf53f 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -1483,6 +1483,7 @@ static const grpc_transport_vtable grpc_cronet_vtable = { sizeof(stream_obj), "cronet_http", init_stream, + nullptr, set_pollset_do_nothing, set_pollset_set_do_nothing, perform_stream_op, diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 3ad4f504bb9..af096d99bca 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -1212,9 +1212,11 @@ void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/, grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; } const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, + sizeof(inproc_stream), "inproc", + init_stream, nullptr, + set_pollset, set_pollset_set, + perform_stream_op, perform_transport_op, + destroy_stream, destroy_transport, get_endpoint}; /******************************************************************************* diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 0f3504f85b7..beb6367ae9c 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -48,6 +48,8 @@ #include +#include + #include #include #include @@ -57,6 +59,7 @@ #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" typedef struct grpc_channel_element grpc_channel_element; @@ -109,6 +112,19 @@ struct grpc_channel_filter { See grpc_call_next_op on how to call the next element in the stack */ void (*start_transport_stream_op_batch)(grpc_call_element* elem, grpc_transport_stream_op_batch* op); + /* Create a promise to execute one call. + If this is non-null, it may be used in preference to + start_transport_stream_op_batch. + If this is used in preference to start_transport_stream_op_batch, the + following can be omitted also: + - calling init_call_elem, destroy_call_elem, set_pollset_or_pollset_set + - allocation of memory for call data + There is an on-going migration to move all filters to providing this, and + then to drop start_transport_stream_op_batch. */ + grpc_core::ArenaPromise (*make_call_promise)( + grpc_channel_element* elem, + grpc_core::ClientInitialMetadata initial_metadata, + grpc_core::NextPromiseFactory next_promise_factory); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 2a03c9cfcb3..c16a46f5819 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -203,6 +203,7 @@ static void connected_channel_get_channel_info( const grpc_channel_filter grpc_connected_filter = { connected_channel_start_transport_stream_op_batch, + nullptr, connected_channel_start_transport_op, sizeof(call_data), connected_channel_init_call_elem, diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index d1bee3ebf68..18d4db4dd80 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -33,59 +33,6 @@ namespace grpc_core { -namespace promise_filter_detail { -class BaseCallData; -}; - -// Small unowned "handle" type to ensure one accessor at a time to metadata. -// The focus here is to get promises to use the syntax we'd like - we'll -// probably substitute some other smart pointer later. -template -class MetadataHandle { - public: - MetadataHandle() = default; - - MetadataHandle(const MetadataHandle&) = delete; - MetadataHandle& operator=(const MetadataHandle&) = delete; - - MetadataHandle(MetadataHandle&& other) noexcept : handle_(other.handle_) { - other.handle_ = nullptr; - } - MetadataHandle& operator=(MetadataHandle&& other) noexcept { - handle_ = other.handle_; - other.handle_ = nullptr; - return *this; - } - - T* operator->() const { return handle_; } - bool has_value() const { return handle_ != nullptr; } - - static MetadataHandle TestOnlyWrap(T* p) { return MetadataHandle(p); } - - private: - friend class promise_filter_detail::BaseCallData; - - explicit MetadataHandle(T* handle) : handle_(handle) {} - T* Unwrap() { - T* result = handle_; - handle_ = nullptr; - return result; - } - - T* handle_ = nullptr; -}; - -// Trailing metadata type -// TODO(ctiller): This should be a bespoke instance of MetadataMap<> -using TrailingMetadata = MetadataHandle; - -// Client initial metadata type -// TODO(ctiller): This should be a bespoke instance of MetadataMap<> -using ClientInitialMetadata = MetadataHandle; - -using NextPromiseFactory = - std::function(ClientInitialMetadata)>; - namespace promise_filter_detail { // Call data shared between all implementations of promise-based filters. @@ -456,6 +403,13 @@ grpc_channel_filter MakePromiseBasedFilter() { [](grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { static_cast(elem->call_data)->StartBatch(batch); }, + // make_call_promise + [](grpc_channel_element* elem, ClientInitialMetadata initial_metadata, + NextPromiseFactory next_promise_factory) { + return static_cast(elem->channel_data) + ->MakeCallPromise(std::move(initial_metadata), + std::move(next_promise_factory)); + }, // start_transport_op - for now unsupported grpc_channel_next_op, // sizeof_call_data diff --git a/src/core/lib/security/authorization/sdk_server_authz_filter.cc b/src/core/lib/security/authorization/sdk_server_authz_filter.cc index 89d1694f6e1..cb9969523e9 100644 --- a/src/core/lib/security/authorization/sdk_server_authz_filter.cc +++ b/src/core/lib/security/authorization/sdk_server_authz_filter.cc @@ -159,6 +159,7 @@ void SdkServerAuthzFilter::CallData::RecvInitialMetadataReady( const grpc_channel_filter SdkServerAuthzFilter::kFilterVtable = { SdkServerAuthzFilter::CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(SdkServerAuthzFilter::CallData), SdkServerAuthzFilter::CallData::Init, diff --git a/src/core/lib/security/transport/client_auth_filter.cc b/src/core/lib/security/transport/client_auth_filter.cc index 718350a4a2e..0965dbef0f0 100644 --- a/src/core/lib/security/transport/client_auth_filter.cc +++ b/src/core/lib/security/transport/client_auth_filter.cc @@ -481,6 +481,7 @@ static void client_auth_destroy_channel_elem(grpc_channel_element* elem) { const grpc_channel_filter grpc_client_auth_filter = { client_auth_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), client_auth_init_call_elem, diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 2f0a375a3c6..185228521b5 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -324,6 +324,7 @@ static void server_auth_destroy_channel_elem(grpc_channel_element* elem) { const grpc_channel_filter grpc_server_auth_filter = { server_auth_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), server_auth_init_call_elem, diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 88dd3c5dc5e..9436bf91d25 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -156,6 +156,7 @@ grpc_arg MakeLameClientErrorArg(grpc_error_handle* error) { const grpc_channel_filter grpc_lame_filter = { grpc_core::lame_start_transport_stream_op_batch, + nullptr, grpc_core::lame_start_transport_op, sizeof(grpc_core::CallData), grpc_core::lame_init_call_elem, diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index da38f98900a..26fe43f0f58 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -498,6 +498,7 @@ class ChannelBroadcaster { const grpc_channel_filter Server::kServerTopFilter = { Server::CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(Server::CallData), Server::CallData::InitCallElement, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 577e439dc4d..a7fa0753a4f 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/byte_stream.h" @@ -42,6 +43,62 @@ #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1 +namespace grpc_core { +// TODO(ctiller): eliminate once MetadataHandle is constructable directly. +namespace promise_filter_detail { +class BaseCallData; +} + +// Small unowned "handle" type to ensure one accessor at a time to metadata. +// The focus here is to get promises to use the syntax we'd like - we'll +// probably substitute some other smart pointer later. +template +class MetadataHandle { + public: + MetadataHandle() = default; + + MetadataHandle(const MetadataHandle&) = delete; + MetadataHandle& operator=(const MetadataHandle&) = delete; + + MetadataHandle(MetadataHandle&& other) noexcept : handle_(other.handle_) { + other.handle_ = nullptr; + } + MetadataHandle& operator=(MetadataHandle&& other) noexcept { + handle_ = other.handle_; + other.handle_ = nullptr; + return *this; + } + + T* operator->() const { return handle_; } + bool has_value() const { return handle_ != nullptr; } + + static MetadataHandle TestOnlyWrap(T* p) { return MetadataHandle(p); } + + private: + friend class promise_filter_detail::BaseCallData; + + explicit MetadataHandle(T* handle) : handle_(handle) {} + T* Unwrap() { + T* result = handle_; + handle_ = nullptr; + return result; + } + + T* handle_ = nullptr; +}; + +// Trailing metadata type +// TODO(ctiller): This should be a bespoke instance of MetadataMap<> +using TrailingMetadata = MetadataHandle; + +// Client initial metadata type +// TODO(ctiller): This should be a bespoke instance of MetadataMap<> +using ClientInitialMetadata = MetadataHandle; + +using NextPromiseFactory = + std::function(ClientInitialMetadata)>; +} // namespace grpc_core + /* forward declarations */ typedef struct grpc_transport grpc_transport; @@ -353,6 +410,14 @@ typedef struct grpc_transport_op { void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport, const void* server_data) = nullptr; void* set_accept_stream_user_data = nullptr; + /** set the callback for accepting new streams based upon promises; + this is a permanent callback, unlike the other one-shot closures. + If true, the callback is set to set_make_promise_fn, with its + user_data argument set to set_make_promise_data */ + bool set_make_promise = false; + void (*set_make_promise_fn)(void* user_data, grpc_transport* transport, + const void* server_data) = nullptr; + void* set_make_promise_user_data = nullptr; /** add this transport to a pollset */ grpc_pollset* bind_pollset = nullptr; /** add this transport to a pollset_set */ diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 526cc1b1bac..11e99935c66 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -36,6 +36,19 @@ typedef struct grpc_transport_vtable { grpc_stream_refcount* refcount, const void* server_data, grpc_core::Arena* arena); + /* Create a promise to execute one client call. + If this is non-null, it may be used in preference to + perform_stream_op. + If this is used in preference to perform_stream_op, the + following can be omitted also: + - calling init_stream, destroy_stream, set_pollset, set_pollset_set + - allocation of memory for call data (sizeof_stream may be ignored) + There is an on-going migration to move all filters to providing this, and + then to drop perform_stream_op. */ + grpc_core::ArenaPromise (*make_call_promise)( + grpc_transport* self, grpc_core::ClientInitialMetadata initial_metadata, + grpc_core::NextPromiseFactory next_promise_factory); + /* implementation of grpc_transport_set_pollset */ void (*set_pollset)(grpc_transport* self, grpc_stream* stream, grpc_pollset* pollset); diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 6e0830f977b..b58af7ad0a8 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -328,6 +328,7 @@ void RegisterChannelFilter( using FilterType = internal::ChannelFilter; static const grpc_channel_filter filter = { FilterType::StartTransportStreamOpBatch, + nullptr, FilterType::StartTransportOp, FilterType::call_data_size, FilterType::InitCallElement, diff --git a/test/core/channel/channel_stack_builder_test.cc b/test/core/channel/channel_stack_builder_test.cc index d9d5ffa9e0e..b3d0a16dd6f 100644 --- a/test/core/channel/channel_stack_builder_test.cc +++ b/test/core/channel/channel_stack_builder_test.cc @@ -75,6 +75,7 @@ TEST(ChannelStackBuilderTest, ReplaceFilter) { const grpc_channel_filter replacement_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, 0, CallInitFunc, @@ -88,6 +89,7 @@ const grpc_channel_filter replacement_filter = { const grpc_channel_filter original_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, 0, CallInitFunc, diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index 0f2861b4829..6e930a0ee67 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -77,6 +77,7 @@ static void free_call(void* arg, grpc_error_handle /*error*/) { static void test_create_channel_stack(void) { const grpc_channel_filter filter = { call_func, + nullptr, channel_func, sizeof(int), call_init_func, diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc index 8944c1a5337..6076bee2f2a 100644 --- a/test/core/end2end/tests/filter_causes_close.cc +++ b/test/core/end2end/tests/filter_causes_close.cc @@ -235,6 +235,7 @@ static void destroy_channel_elem(grpc_channel_element* /*elem*/) {} static const grpc_channel_filter test_filter = { start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/test/core/end2end/tests/filter_context.cc b/test/core/end2end/tests/filter_context.cc index a071a2ffe2c..1c78570876d 100644 --- a/test/core/end2end/tests/filter_context.cc +++ b/test/core/end2end/tests/filter_context.cc @@ -258,6 +258,7 @@ static void destroy_channel_elem(grpc_channel_element* /*elem*/) {} static const grpc_channel_filter test_filter = { start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/test/core/end2end/tests/filter_init_fails.cc b/test/core/end2end/tests/filter_init_fails.cc index d252c2d245d..336f3631fe9 100644 --- a/test/core/end2end/tests/filter_init_fails.cc +++ b/test/core/end2end/tests/filter_init_fails.cc @@ -442,6 +442,7 @@ static void destroy_channel_elem(grpc_channel_element* /*elem*/) {} static const grpc_channel_filter test_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, 0, init_call_elem, diff --git a/test/core/end2end/tests/filter_latency.cc b/test/core/end2end/tests/filter_latency.cc index d87db6f40c5..f5d51d8c883 100644 --- a/test/core/end2end/tests/filter_latency.cc +++ b/test/core/end2end/tests/filter_latency.cc @@ -276,6 +276,7 @@ static void destroy_channel_elem(grpc_channel_element* /*elem*/) {} static const grpc_channel_filter test_client_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, 0, init_call_elem, @@ -289,6 +290,7 @@ static const grpc_channel_filter test_client_filter = { static const grpc_channel_filter test_server_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, 0, init_call_elem, diff --git a/test/core/end2end/tests/filter_status_code.cc b/test/core/end2end/tests/filter_status_code.cc index 7d36b2619d3..d31193afa53 100644 --- a/test/core/end2end/tests/filter_status_code.cc +++ b/test/core/end2end/tests/filter_status_code.cc @@ -323,6 +323,7 @@ static void destroy_channel_elem(grpc_channel_element* /*elem*/) {} static const grpc_channel_filter test_client_filter = { grpc_call_next_op, + nullptr, grpc_channel_next_op, sizeof(final_status_data), init_call_elem, @@ -336,6 +337,7 @@ static const grpc_channel_filter test_client_filter = { static const grpc_channel_filter test_server_filter = { server_start_transport_stream_op_batch, + nullptr, grpc_channel_next_op, sizeof(final_status_data), init_call_elem, diff --git a/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc b/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc index 201af2d6ba7..1d5fde05ddf 100644 --- a/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc +++ b/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc @@ -293,6 +293,7 @@ class FailSendOpsFilter { grpc_channel_filter FailSendOpsFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc b/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc index 5a4097298fa..7e7411a7010 100644 --- a/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc +++ b/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc @@ -325,6 +325,7 @@ class InjectStatusFilter { grpc_channel_filter InjectStatusFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/end2end/tests/retry_send_op_fails.cc b/test/core/end2end/tests/retry_send_op_fails.cc index ab716a20f27..f3012235d50 100644 --- a/test/core/end2end/tests/retry_send_op_fails.cc +++ b/test/core/end2end/tests/retry_send_op_fails.cc @@ -344,6 +344,7 @@ class FailFirstSendOpFilter { grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, + nullptr, grpc_channel_next_op, sizeof(CallData), CallData::Init, diff --git a/test/core/xds/xds_channel_stack_modifier_test.cc b/test/core/xds/xds_channel_stack_modifier_test.cc index d1fd905a067..db3bff022f1 100644 --- a/test/core/xds/xds_channel_stack_modifier_test.cc +++ b/test/core/xds/xds_channel_stack_modifier_test.cc @@ -69,11 +69,11 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) { grpc_init(); // Add 2 test filters to XdsChannelStackModifier const grpc_channel_filter test_filter_1 = { - nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, kTestFilter1}; + nullptr, nullptr, nullptr, 0, nullptr, nullptr, + nullptr, 0, nullptr, nullptr, nullptr, kTestFilter1}; const grpc_channel_filter test_filter_2 = { - nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, kTestFilter2}; + nullptr, nullptr, nullptr, 0, nullptr, nullptr, + nullptr, 0, nullptr, nullptr, nullptr, kTestFilter2}; auto channel_stack_modifier = MakeRefCounted( std::vector{&test_filter_1, &test_filter_2}); grpc_arg arg = channel_stack_modifier->MakeChannelArg(); @@ -114,11 +114,11 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) { grpc_init(); // Add 2 test filters to XdsChannelStackModifier const grpc_channel_filter test_filter_1 = { - nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, kTestFilter1}; + nullptr, nullptr, nullptr, 0, nullptr, nullptr, + nullptr, 0, nullptr, nullptr, nullptr, kTestFilter1}; const grpc_channel_filter test_filter_2 = { - nullptr, nullptr, 0, nullptr, nullptr, nullptr, - 0, nullptr, nullptr, nullptr, kTestFilter2}; + nullptr, nullptr, nullptr, 0, nullptr, nullptr, + nullptr, 0, nullptr, nullptr, nullptr, kTestFilter2}; auto channel_stack_modifier = MakeRefCounted( std::vector{&test_filter_1, &test_filter_2}); grpc_arg arg = channel_stack_modifier->MakeChannelArg(); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index bf1de891f45..2cd7f7daf54 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -384,17 +384,13 @@ void DestroyChannelElem(grpc_channel_element* /*elem*/) {} void GetChannelInfo(grpc_channel_element* /*elem*/, const grpc_channel_info* /*channel_info*/) {} -static const grpc_channel_filter phony_filter = {StartTransportStreamOp, - StartTransportOp, - 0, - InitCallElem, - SetPollsetOrPollsetSet, - DestroyCallElem, - 0, - InitChannelElem, - DestroyChannelElem, - GetChannelInfo, - "phony_filter"}; +static const grpc_channel_filter phony_filter = { + StartTransportStreamOp, nullptr, + StartTransportOp, 0, + InitCallElem, SetPollsetOrPollsetSet, + DestroyCallElem, 0, + InitChannelElem, DestroyChannelElem, + GetChannelInfo, "phony_filter"}; } // namespace phony_filter @@ -441,11 +437,17 @@ void Destroy(grpc_transport* /*self*/) {} /* implementation of grpc_transport_get_endpoint */ grpc_endpoint* GetEndpoint(grpc_transport* /*self*/) { return nullptr; } -static const grpc_transport_vtable phony_transport_vtable = { - 0, "phony_http2", InitStream, - SetPollset, SetPollsetSet, PerformStreamOp, - PerformOp, DestroyStream, Destroy, - GetEndpoint}; +static const grpc_transport_vtable phony_transport_vtable = {0, + "phony_http2", + InitStream, + nullptr, + SetPollset, + SetPollsetSet, + PerformStreamOp, + PerformOp, + DestroyStream, + Destroy, + GetEndpoint}; static grpc_transport phony_transport = {&phony_transport_vtable}; @@ -682,17 +684,12 @@ void GetChannelInfo(grpc_channel_element* /*elem*/, const grpc_channel_info* /*channel_info*/) {} static const grpc_channel_filter isolated_call_filter = { - StartTransportStreamOp, - StartTransportOp, - sizeof(call_data), - InitCallElem, - SetPollsetOrPollsetSet, - DestroyCallElem, - 0, - InitChannelElem, - DestroyChannelElem, - GetChannelInfo, - "isolated_call_filter"}; + StartTransportStreamOp, nullptr, + StartTransportOp, sizeof(call_data), + InitCallElem, SetPollsetOrPollsetSet, + DestroyCallElem, 0, + InitChannelElem, DestroyChannelElem, + GetChannelInfo, "isolated_call_filter"}; } // namespace isolated_call_filter class IsolatedCallFixture : public TrackCounters {