Merge branch 'grpc:master' into build-skip-ios

pull/35226/head
John Cormie 1 year ago committed by GitHub
commit 408b57bd6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/core/ext/filters/client_channel/client_channel.cc
  2. 1
      src/core/ext/filters/client_channel/retry_filter.cc
  3. 2
      src/core/ext/filters/deadline/deadline_filter.cc
  4. 6
      src/core/lib/channel/channel_stack.h
  5. 1
      src/core/lib/channel/connected_channel.cc
  6. 203
      src/core/lib/channel/promise_based_filter.h
  7. 3
      src/core/lib/surface/call_trace.cc
  8. 23
      src/core/lib/surface/server.cc
  9. 3
      src/core/lib/surface/server.h
  10. 112
      src/core/lib/transport/transport.h
  11. 20
      src/cpp/server/secure_server_credentials.cc
  12. 6
      src/cpp/server/secure_server_credentials.h
  13. 4
      test/core/channel/channel_stack_builder_test.cc
  14. 1
      test/core/channel/channel_stack_test.cc
  15. 1
      test/core/end2end/tests/filter_causes_close.cc
  16. 2
      test/core/end2end/tests/filter_context.cc
  17. 2
      test/core/end2end/tests/filter_init_fails.cc
  18. 1
      test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc
  19. 1
      test/core/end2end/tests/retry_recv_message_replay.cc
  20. 1
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  21. 1
      test/core/end2end/tests/retry_send_op_fails.cc
  22. 1
      test/core/end2end/tests/retry_transparent_goaway.cc
  23. 1
      test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
  24. 6
      test/core/surface/channel_init_test.cc
  25. 1
      test/core/transport/chttp2/streams_not_seen_test.cc
  26. 8
      test/core/xds/xds_channel_stack_modifier_test.cc

@ -407,6 +407,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
const grpc_channel_filter ClientChannel::kFilterVtableWithPromises = { const grpc_channel_filter ClientChannel::kFilterVtableWithPromises = {
ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch, ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch,
ClientChannel::MakeCallPromise, ClientChannel::MakeCallPromise,
/* init_call: */ nullptr,
ClientChannel::StartTransportOp, ClientChannel::StartTransportOp,
sizeof(ClientChannel::FilterBasedCallData), sizeof(ClientChannel::FilterBasedCallData),
ClientChannel::FilterBasedCallData::Init, ClientChannel::FilterBasedCallData::Init,
@ -423,6 +424,7 @@ const grpc_channel_filter ClientChannel::kFilterVtableWithPromises = {
const grpc_channel_filter ClientChannel::kFilterVtableWithoutPromises = { const grpc_channel_filter ClientChannel::kFilterVtableWithoutPromises = {
ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch, ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch,
nullptr, nullptr,
/* init_call: */ nullptr,
ClientChannel::StartTransportOp, ClientChannel::StartTransportOp,
sizeof(ClientChannel::FilterBasedCallData), sizeof(ClientChannel::FilterBasedCallData),
ClientChannel::FilterBasedCallData::Init, ClientChannel::FilterBasedCallData::Init,
@ -570,6 +572,7 @@ class DynamicTerminationFilter::CallData {
const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = { const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
DynamicTerminationFilter::CallData::StartTransportStreamOpBatch, DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
DynamicTerminationFilter::MakeCallPromise, DynamicTerminationFilter::MakeCallPromise,
/* init_call: */ nullptr,
DynamicTerminationFilter::StartTransportOp, DynamicTerminationFilter::StartTransportOp,
sizeof(DynamicTerminationFilter::CallData), sizeof(DynamicTerminationFilter::CallData),
DynamicTerminationFilter::CallData::Init, DynamicTerminationFilter::CallData::Init,

@ -143,6 +143,7 @@ const RetryMethodConfig* RetryFilter::GetRetryPolicy(
const grpc_channel_filter RetryFilter::kVtable = { const grpc_channel_filter RetryFilter::kVtable = {
RetryFilter::LegacyCallData::StartTransportStreamOpBatch, RetryFilter::LegacyCallData::StartTransportStreamOpBatch,
nullptr, nullptr,
/* init_call: */ nullptr,
RetryFilter::StartTransportOp, RetryFilter::StartTransportOp,
sizeof(RetryFilter::LegacyCallData), sizeof(RetryFilter::LegacyCallData),
RetryFilter::LegacyCallData::Init, RetryFilter::LegacyCallData::Init,

@ -343,6 +343,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
grpc_core::NextPromiseFactory next_promise_factory) { grpc_core::NextPromiseFactory next_promise_factory) {
return next_promise_factory(std::move(call_args)); return next_promise_factory(std::move(call_args));
}, },
/* init_call: */ nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(grpc_deadline_state), sizeof(grpc_deadline_state),
deadline_init_call_elem, deadline_init_call_elem,
@ -368,6 +369,7 @@ const grpc_channel_filter grpc_server_deadline_filter = {
} }
return next_promise_factory(std::move(call_args)); return next_promise_factory(std::move(call_args));
}, },
/* init_call: */ nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(server_call_data), sizeof(server_call_data),
deadline_init_call_elem, deadline_init_call_elem,

@ -128,6 +128,12 @@ struct grpc_channel_filter {
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> (*make_call_promise)( grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> (*make_call_promise)(
grpc_channel_element* elem, grpc_core::CallArgs call_args, grpc_channel_element* elem, grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory); grpc_core::NextPromiseFactory next_promise_factory);
// Register interceptors into a call.
// If this is non-null it may be used in preference to make_call_promise.
// There is an on-going migration to move all filters to providing this, and
// then to drop start_transport_stream_op_batch.
void (*init_call)(grpc_channel_element* elem,
grpc_core::CallSpineInterface* call_spine);
// Called to handle channel level operations - e.g. new calls, or transport // Called to handle channel level operations - e.g. new calls, or transport
// closure. // closure.
// See grpc_channel_next_op on how to call the next element in the stack // See grpc_channel_next_op on how to call the next element in the stack

@ -857,6 +857,7 @@ grpc_channel_filter MakeConnectedFilter() {
return { return {
connected_channel_start_transport_stream_op_batch, connected_channel_start_transport_stream_op_batch,
make_call_promise != nullptr ? make_call_wrapper : nullptr, make_call_promise != nullptr ? make_call_wrapper : nullptr,
/* init_call: */ nullptr,
connected_channel_start_transport_op, connected_channel_start_transport_op,
sizeof(call_data), sizeof(call_data),
connected_channel_init_call_elem, connected_channel_init_call_elem,

@ -345,15 +345,77 @@ inline auto RunCall(void (Derived::Call::*fn)(ClientMetadata& md,
} }
inline void InterceptClientToServerMessage(const NoInterceptor*, void*, inline void InterceptClientToServerMessage(const NoInterceptor*, void*,
CallArgs&) {} const CallArgs&) {}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived>
inline void InterceptClientInitialMetadata(
void (Derived::Call::*fn)(ClientMetadata& md), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call](ClientMetadataHandle md) {
call->OnClientInitialMetadata(*md);
return md;
});
}
template <typename Derived>
inline void InterceptClientInitialMetadata(
void (Derived::Call::*fn)(ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call, channel](ClientMetadataHandle md) {
call->OnClientInitialMetadata(*md, channel);
return md;
});
}
template <typename Derived>
inline void InterceptClientInitialMetadata(
ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine,
call](ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto return_md = call->OnClientInitialMetadata(*md);
if (return_md == nullptr) return std::move(md);
return call_spine->Cancel(std::move(return_md));
});
}
template <typename Derived>
inline void InterceptClientInitialMetadata(
ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md,
Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine, call, channel](
ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto return_md = call->OnClientInitialMetadata(*md, channel);
if (return_md == nullptr) return std::move(md);
return call_spine->Cancel(std::move(return_md));
});
}
template <typename CallArgs>
inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, inline void InterceptServerInitialMetadata(const NoInterceptor*, void*,
CallArgs&) {} const CallArgs&) {}
template <typename Derived> template <typename Derived>
inline void InterceptServerInitialMetadata( inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&), void (Derived::Call::*fn)(ServerMetadata&),
FilterCallData<Derived>* call_data, CallArgs& call_args) { FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap( call_args.server_initial_metadata->InterceptAndMap(
[call_data](ServerMetadataHandle md) { [call_data](ServerMetadataHandle md) {
@ -365,7 +427,7 @@ inline void InterceptServerInitialMetadata(
template <typename Derived> template <typename Derived>
inline void InterceptServerInitialMetadata( inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&), absl::Status (Derived::Call::*fn)(ServerMetadata&),
FilterCallData<Derived>* call_data, CallArgs& call_args) { FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap( call_args.server_initial_metadata->InterceptAndMap(
[call_data]( [call_data](
@ -379,8 +441,69 @@ inline void InterceptServerInitialMetadata(
}); });
} }
template <typename CallArgs>
inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) {
call->OnServerInitialMetadata(*md);
return md;
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*, inline void InterceptServerToClientMessage(const NoInterceptor*, void*,
CallArgs&) {} const CallArgs&) {}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) {
call->OnServerTrailingMetadata(*md);
return md;
});
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerTrailingMetadata(*md);
if (status.ok()) return std::move(md);
return ServerMetadataFromStatus(status);
});
}
template <typename Derived> template <typename Derived>
absl::enable_if_t<std::is_empty<FilterCallData<Derived>>::value, absl::enable_if_t<std::is_empty<FilterCallData<Derived>>::value,
@ -449,6 +572,29 @@ MakeFilterCall(Derived* derived) {
template <typename Derived> template <typename Derived>
class ImplementChannelFilter : public ChannelFilter { class ImplementChannelFilter : public ChannelFilter {
public: public:
// Natively construct a v3 call.
void InitCall(CallSpineInterface* call_spine) {
auto* call = GetContext<Arena>()->ManagedNew<typename Derived::Call>();
promise_filter_detail::InterceptClientInitialMetadata(
&Derived::Call::OnClientInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptClientToServerMessage(
&Derived::Call::OnClientToServerMessage, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptServerInitialMetadata(
&Derived::Call::OnServerInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptServerTrailingMetadata(
&Derived::Call::OnServerTrailingMetadata, call,
static_cast<Derived*>(this), call_spine);
}
// Polyfill for the original promise scheme.
// Allows writing v3 filters that work with v2 stacks.
// (and consequently also v1 stacks since we can polyfill back to that too).
ArenaPromise<ServerMetadataHandle> MakeCallPromise( ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) final { CallArgs call_args, NextPromiseFactory next_promise_factory) final {
auto* call = promise_filter_detail::MakeFilterCall<Derived>( auto* call = promise_filter_detail::MakeFilterCall<Derived>(
@ -1262,7 +1408,49 @@ struct ChannelFilterWithFlagsMethods {
// ChannelArgs channel_args, ChannelFilter::Args filter_args); // ChannelArgs channel_args, ChannelFilter::Args filter_args);
// }; // };
template <typename F, FilterEndpoint kEndpoint, uint8_t kFlags = 0> template <typename F, FilterEndpoint kEndpoint, uint8_t kFlags = 0>
absl::enable_if_t<std::is_base_of<ChannelFilter, F>::value, grpc_channel_filter> absl::enable_if_t<std::is_base_of<ChannelFilter, F>::value &&
!std::is_base_of<ImplementChannelFilter<F>, F>::value,
grpc_channel_filter>
MakePromiseBasedFilter(const char* name) {
using CallData = promise_filter_detail::CallData<kEndpoint>;
return grpc_channel_filter{
// start_transport_stream_op_batch
promise_filter_detail::BaseCallDataMethods::StartTransportStreamOpBatch,
// make_call_promise
promise_filter_detail::ChannelFilterMethods::MakeCallPromise,
nullptr,
// start_transport_op
promise_filter_detail::ChannelFilterMethods::StartTransportOp,
// sizeof_call_data
sizeof(CallData),
// init_call_elem
promise_filter_detail::CallDataFilterWithFlagsMethods<
CallData, kFlags>::InitCallElem,
// set_pollset_or_pollset_set
promise_filter_detail::BaseCallDataMethods::SetPollsetOrPollsetSet,
// destroy_call_elem
promise_filter_detail::CallDataFilterWithFlagsMethods<
CallData, kFlags>::DestroyCallElem,
// sizeof_channel_data
sizeof(F),
// init_channel_elem
promise_filter_detail::ChannelFilterWithFlagsMethods<
F, kFlags>::InitChannelElem,
// post_init_channel_elem
promise_filter_detail::ChannelFilterMethods::PostInitChannelElem,
// destroy_channel_elem
promise_filter_detail::ChannelFilterMethods::DestroyChannelElem,
// get_channel_info
promise_filter_detail::ChannelFilterMethods::GetChannelInfo,
// name
name,
};
}
template <typename F, FilterEndpoint kEndpoint, uint8_t kFlags = 0>
absl::enable_if_t<std::is_base_of<ImplementChannelFilter<F>, F>::value,
grpc_channel_filter>
MakePromiseBasedFilter(const char* name) { MakePromiseBasedFilter(const char* name) {
using CallData = promise_filter_detail::CallData<kEndpoint>; using CallData = promise_filter_detail::CallData<kEndpoint>;
@ -1271,6 +1459,9 @@ MakePromiseBasedFilter(const char* name) {
promise_filter_detail::BaseCallDataMethods::StartTransportStreamOpBatch, promise_filter_detail::BaseCallDataMethods::StartTransportStreamOpBatch,
// make_call_promise // make_call_promise
promise_filter_detail::ChannelFilterMethods::MakeCallPromise, promise_filter_detail::ChannelFilterMethods::MakeCallPromise,
[](grpc_channel_element* elem, CallSpineInterface* args) {
static_cast<F*>(elem->channel_data)->InitCall(args);
},
// start_transport_op // start_transport_op
promise_filter_detail::ChannelFilterMethods::StartTransportOp, promise_filter_detail::ChannelFilterMethods::StartTransportOp,
// sizeof_call_data // sizeof_call_data

@ -77,7 +77,8 @@ const grpc_channel_filter* PromiseTracingFilterFor(
return r; return r;
}; };
}, },
grpc_channel_next_op, /* sizeof_call_data: */ 0, /* init_call: */ nullptr, grpc_channel_next_op,
/* sizeof_call_data: */ 0,
// init_call_elem: // init_call_elem:
[](grpc_call_element*, const grpc_call_element_args*) { [](grpc_call_element*, const grpc_call_element_args*) {
return absl::OkStatus(); return absl::OkStatus();

@ -757,6 +757,7 @@ class ChannelBroadcaster {
const grpc_channel_filter Server::kServerTopFilter = { const grpc_channel_filter Server::kServerTopFilter = {
Server::CallData::StartTransportStreamOpBatch, Server::CallData::StartTransportStreamOpBatch,
Server::ChannelData::MakeCallPromise, Server::ChannelData::MakeCallPromise,
/* init_call: */ nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(Server::CallData), sizeof(Server::CallData),
Server::CallData::InitCallElement, Server::CallData::InitCallElement,
@ -1330,7 +1331,10 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
op->set_accept_stream = true; op->set_accept_stream = true;
op->set_accept_stream_fn = AcceptStream; op->set_accept_stream_fn = AcceptStream;
if (IsRegisteredMethodLookupInTransportEnabled()) { if (IsRegisteredMethodLookupInTransportEnabled()) {
op->set_registered_method_matcher_fn = SetRegisteredMethodOnMetadata; op->set_registered_method_matcher_fn = [](void* arg,
ClientMetadata* metadata) {
static_cast<ChannelData*>(arg)->SetRegisteredMethodOnMetadata(*metadata);
};
} }
// op->set_registered_method_matcher_fn = Registered // op->set_registered_method_matcher_fn = Registered
op->set_accept_stream_user_data = this; op->set_accept_stream_user_data = this;
@ -1386,30 +1390,29 @@ Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod(
} }
void Server::ChannelData::SetRegisteredMethodOnMetadata( void Server::ChannelData::SetRegisteredMethodOnMetadata(
void* arg, ServerMetadata* metadata) { ClientMetadata& metadata) {
auto* chand = static_cast<Server::ChannelData*>(arg); auto* authority = metadata.get_pointer(HttpAuthorityMetadata());
auto* authority = metadata->get_pointer(HttpAuthorityMetadata());
if (authority == nullptr) { if (authority == nullptr) {
authority = metadata->get_pointer(HostMetadata()); authority = metadata.get_pointer(HostMetadata());
if (authority == nullptr) { if (authority == nullptr) {
// Authority not being set is an RPC error. // Authority not being set is an RPC error.
return; return;
} }
} }
auto* path = metadata->get_pointer(HttpPathMetadata()); auto* path = metadata.get_pointer(HttpPathMetadata());
if (path == nullptr) { if (path == nullptr) {
// Path not being set would result in an RPC error. // Path not being set would result in an RPC error.
return; return;
} }
ChannelRegisteredMethod* method; ChannelRegisteredMethod* method;
if (!IsRegisteredMethodsMapEnabled()) { if (!IsRegisteredMethodsMapEnabled()) {
method = chand->GetRegisteredMethod(authority->c_slice(), path->c_slice()); method = GetRegisteredMethod(authority->c_slice(), path->c_slice());
} else { } else {
method = chand->GetRegisteredMethod(authority->as_string_view(), method = GetRegisteredMethod(authority->as_string_view(),
path->as_string_view()); path->as_string_view());
} }
// insert in metadata // insert in metadata
metadata->Set(GrpcRegisteredMethod(), method); metadata.Set(GrpcRegisteredMethod(), method);
} }
void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/, void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/,

@ -268,8 +268,7 @@ class Server : public InternallyRefCounted<Server>,
static void AcceptStream(void* arg, Transport* /*transport*/, static void AcceptStream(void* arg, Transport* /*transport*/,
const void* transport_server_data); const void* transport_server_data);
static void SetRegisteredMethodOnMetadata(void* arg, void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
ServerMetadata* metadata);
void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_); void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);

@ -55,6 +55,7 @@
#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/status.h" #include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/pipe.h"
#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
@ -228,6 +229,117 @@ struct CallArgs {
using NextPromiseFactory = using NextPromiseFactory =
std::function<ArenaPromise<ServerMetadataHandle>(CallArgs)>; std::function<ArenaPromise<ServerMetadataHandle>(CallArgs)>;
// The common middle part of a call - a reference is held by each of
// CallInitiator and CallHandler - which provide interfaces that are appropriate
// for each side of a call.
// The spine will ultimately host the pipes, filters, and context for one part
// of a call: ie top-half client channel, sub channel call, server call.
// TODO(ctiller): eventually drop this when we don't need to reference into
// legacy promise calls anymore
class CallSpineInterface {
public:
virtual ~CallSpineInterface() = default;
virtual Pipe<ClientMetadataHandle>& client_initial_metadata() = 0;
virtual Pipe<ServerMetadataHandle>& server_initial_metadata() = 0;
virtual Pipe<MessageHandle>& client_to_server_messages() = 0;
virtual Pipe<MessageHandle>& server_to_client_messages() = 0;
virtual Pipe<ServerMetadataHandle>& server_trailing_metadata() = 0;
// Cancel the call with the given metadata.
// Regarding the `MUST_USE_RESULT absl::nullopt_t`:
// Most cancellation calls right now happen in pipe interceptors;
// there `nullopt` indicates terminate processing of this pipe and close with
// error.
// It's convenient then to have the Cancel operation (setting the latch to
// terminate the call) be the last thing that occurs in a pipe interceptor,
// and this construction supports that (and has helped the author not write
// some bugs).
GRPC_MUST_USE_RESULT virtual absl::nullopt_t Cancel(
ServerMetadataHandle metadata) = 0;
virtual Party& party() = 0;
// Wrap a promise so that if it returns failure it automatically cancels
// the rest of the call.
// The resulting (returned) promise will resolve to Empty.
template <typename Promise>
auto CancelIfFails(Promise promise) {
GPR_DEBUG_ASSERT(Activity::current() == &party());
using P = promise_detail::PromiseLike<Promise>;
using ResultType = typename P::Result;
return Map(std::move(promise), [this](ResultType r) {
if (!IsStatusOk(r)) {
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(std::move(r)));
}
return Empty{};
});
}
// Spawn a promise that returns Empty{} and save some boilerplate handling
// that detail.
template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
party().Spawn(name, std::move(promise_factory), [](Empty) {});
}
// Spawn a promise that returns some status-like type; if the status
// represents failure automatically cancel the rest of the call.
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
using FactoryType =
promise_detail::OncePromiseFactory<void, PromiseFactory>;
using PromiseType = typename FactoryType::Promise;
using ResultType = typename PromiseType::Result;
static_assert(
std::is_same<bool,
decltype(IsStatusOk(std::declval<ResultType>()))>::value,
"SpawnGuarded promise must return a status-like object");
party().Spawn(name, std::move(promise_factory), [this](ResultType r) {
if (!IsStatusOk(r)) {
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(std::move(r)));
}
});
}
};
class CallSpine final : public CallSpineInterface {
public:
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
return client_initial_metadata_;
}
Pipe<ServerMetadataHandle>& server_initial_metadata() override {
return server_initial_metadata_;
}
Pipe<MessageHandle>& client_to_server_messages() override {
return client_to_server_messages_;
}
Pipe<MessageHandle>& server_to_client_messages() override {
return server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
absl::nullopt_t Cancel(ServerMetadataHandle metadata) override {
GPR_DEBUG_ASSERT(Activity::current() == &party());
if (cancel_latch_.is_set()) return absl::nullopt;
cancel_latch_.Set(std::move(metadata));
return absl::nullopt;
}
Party& party() override { Crash("unimplemented"); }
private:
// Initial metadata from client to server
Pipe<ClientMetadataHandle> client_initial_metadata_;
// Initial metadata from server to client
Pipe<ServerMetadataHandle> server_initial_metadata_;
// Messages travelling from the application to the transport.
Pipe<MessageHandle> client_to_server_messages_;
// Messages travelling from the transport to the application.
Pipe<MessageHandle> server_to_client_messages_;
// Trailing metadata from server to client
Pipe<ServerMetadataHandle> server_trailing_metadata_;
// Latch that can be set to terminate the call
Latch<ServerMetadataHandle> cancel_latch_;
};
} // namespace grpc_core } // namespace grpc_core
// forward declarations // forward declarations

@ -36,15 +36,15 @@
namespace grpc { namespace grpc {
void AuthMetadataProcessorAyncWrapper::Destroy(void* wrapper) { void AuthMetadataProcessorAsyncWrapper::Destroy(void* wrapper) {
auto* w = static_cast<AuthMetadataProcessorAyncWrapper*>(wrapper); auto* w = static_cast<AuthMetadataProcessorAsyncWrapper*>(wrapper);
delete w; delete w;
} }
void AuthMetadataProcessorAyncWrapper::Process( void AuthMetadataProcessorAsyncWrapper::Process(
void* wrapper, grpc_auth_context* context, const grpc_metadata* md, void* wrapper, grpc_auth_context* context, const grpc_metadata* md,
size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) { size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) {
auto* w = static_cast<AuthMetadataProcessorAyncWrapper*>(wrapper); auto* w = static_cast<AuthMetadataProcessorAsyncWrapper*>(wrapper);
if (!w->processor_) { if (!w->processor_) {
// Early exit. // Early exit.
cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr); cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr);
@ -52,8 +52,8 @@ void AuthMetadataProcessorAyncWrapper::Process(
} }
if (w->processor_->IsBlocking()) { if (w->processor_->IsBlocking()) {
w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { w->thread_pool_->Add([w, context, md, num_md, cb, user_data] {
w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, w->AuthMetadataProcessorAsyncWrapper::InvokeProcessor(context, md, num_md,
cb, user_data); cb, user_data);
}); });
} else { } else {
// invoke directly. // invoke directly.
@ -61,7 +61,7 @@ void AuthMetadataProcessorAyncWrapper::Process(
} }
} }
void AuthMetadataProcessorAyncWrapper::InvokeProcessor( void AuthMetadataProcessorAsyncWrapper::InvokeProcessor(
grpc_auth_context* context, const grpc_metadata* md, size_t num_md, grpc_auth_context* context, const grpc_metadata* md, size_t num_md,
grpc_process_auth_metadata_done_cb cb, void* user_data) { grpc_process_auth_metadata_done_cb cb, void* user_data) {
AuthMetadataProcessor::InputMetadata metadata; AuthMetadataProcessor::InputMetadata metadata;
@ -104,10 +104,10 @@ int SecureServerCredentials::AddPortToServer(const std::string& addr,
void SecureServerCredentials::SetAuthMetadataProcessor( void SecureServerCredentials::SetAuthMetadataProcessor(
const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) { const std::shared_ptr<grpc::AuthMetadataProcessor>& processor) {
auto* wrapper = new grpc::AuthMetadataProcessorAyncWrapper(processor); auto* wrapper = new grpc::AuthMetadataProcessorAsyncWrapper(processor);
grpc_server_credentials_set_auth_metadata_processor( grpc_server_credentials_set_auth_metadata_processor(
creds_, {grpc::AuthMetadataProcessorAyncWrapper::Process, creds_, {grpc::AuthMetadataProcessorAsyncWrapper::Process,
grpc::AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); grpc::AuthMetadataProcessorAsyncWrapper::Destroy, wrapper});
} }
std::shared_ptr<ServerCredentials> SslServerCredentials( std::shared_ptr<ServerCredentials> SslServerCredentials(

@ -35,7 +35,7 @@ namespace grpc {
class SecureServerCredentials; class SecureServerCredentials;
class AuthMetadataProcessorAyncWrapper final { class AuthMetadataProcessorAsyncWrapper final {
public: public:
static void Destroy(void* wrapper); static void Destroy(void* wrapper);
@ -43,7 +43,7 @@ class AuthMetadataProcessorAyncWrapper final {
const grpc_metadata* md, size_t num_md, const grpc_metadata* md, size_t num_md,
grpc_process_auth_metadata_done_cb cb, void* user_data); grpc_process_auth_metadata_done_cb cb, void* user_data);
explicit AuthMetadataProcessorAyncWrapper( explicit AuthMetadataProcessorAsyncWrapper(
const std::shared_ptr<AuthMetadataProcessor>& processor) const std::shared_ptr<AuthMetadataProcessor>& processor)
: processor_(processor) { : processor_(processor) {
if (processor && processor->IsBlocking()) { if (processor && processor->IsBlocking()) {
@ -78,7 +78,7 @@ class SecureServerCredentials final : public ServerCredentials {
SecureServerCredentials* AsSecureServerCredentials() override { return this; } SecureServerCredentials* AsSecureServerCredentials() override { return this; }
grpc_server_credentials* creds_; grpc_server_credentials* creds_;
std::unique_ptr<grpc::AuthMetadataProcessorAyncWrapper> processor_; std::unique_ptr<grpc::AuthMetadataProcessorAsyncWrapper> processor_;
}; };
} // namespace grpc } // namespace grpc

@ -62,8 +62,8 @@ const grpc_channel_filter* FilterNamed(const char* name) {
->emplace( ->emplace(
name, name,
new grpc_channel_filter{ new grpc_channel_filter{
grpc_call_next_op, nullptr, grpc_channel_next_op, 0, CallInitFunc, grpc_call_next_op, nullptr, nullptr, grpc_channel_next_op, 0,
grpc_call_stack_ignore_set_pollset_or_pollset_set, CallInitFunc, grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallDestroyFunc, 0, ChannelInitFunc, CallDestroyFunc, 0, ChannelInitFunc,
[](grpc_channel_stack*, grpc_channel_element*) {}, [](grpc_channel_stack*, grpc_channel_element*) {},
ChannelDestroyFunc, grpc_channel_next_get_info, name}) ChannelDestroyFunc, grpc_channel_next_get_info, name})

@ -83,6 +83,7 @@ TEST(ChannelStackTest, CreateChannelStack) {
const grpc_channel_filter filter = { const grpc_channel_filter filter = {
call_func, call_func,
nullptr, nullptr,
nullptr,
channel_func, channel_func,
sizeof(int), sizeof(int),
call_init_func, call_init_func,

@ -102,6 +102,7 @@ const grpc_channel_filter test_filter = {
return Immediate(ServerMetadataFromStatus( return Immediate(ServerMetadataFromStatus(
absl::PermissionDeniedError("Failure that's not preventable."))); absl::PermissionDeniedError("Failure that's not preventable.")));
}, },
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(call_data), sizeof(call_data),
init_call_elem, init_call_elem,

@ -82,7 +82,7 @@ grpc_error_handle init_channel_elem(grpc_channel_element* /*elem*/,
void destroy_channel_elem(grpc_channel_element* /*elem*/) {} void destroy_channel_elem(grpc_channel_element* /*elem*/) {}
const grpc_channel_filter test_filter = { const grpc_channel_filter test_filter = {
start_transport_stream_op_batch, nullptr, grpc_channel_next_op, start_transport_stream_op_batch, nullptr, nullptr, grpc_channel_next_op,
sizeof(call_data), init_call_elem, sizeof(call_data), init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0, grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0,
init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem, init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem,

@ -80,7 +80,7 @@ const grpc_channel_filter test_filter = {
return Immediate(ServerMetadataFromStatus( return Immediate(ServerMetadataFromStatus(
absl::PermissionDeniedError("access denied"))); absl::PermissionDeniedError("access denied")));
}, },
grpc_channel_next_op, 0, init_call_elem, nullptr, grpc_channel_next_op, 0, init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0, grpc_call_stack_ignore_set_pollset_or_pollset_set, destroy_call_elem, 0,
init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem, init_channel_elem, grpc_channel_stack_no_post_init, destroy_channel_elem,
grpc_channel_next_get_info, grpc_channel_next_get_info,

@ -159,6 +159,7 @@ class FailSendOpsFilter {
grpc_channel_filter FailSendOpsFilter::kFilterVtable = { grpc_channel_filter FailSendOpsFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -109,6 +109,7 @@ class FailFirstSendOpFilter {
grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = { grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -104,6 +104,7 @@ class InjectStatusFilter {
grpc_channel_filter InjectStatusFilter::kFilterVtable = { grpc_channel_filter InjectStatusFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -110,6 +110,7 @@ class FailFirstCallFilter {
grpc_channel_filter FailFirstCallFilter::kFilterVtable = { grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -116,6 +116,7 @@ class FailFirstCallFilter {
grpc_channel_filter FailFirstCallFilter::kFilterVtable = { grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -115,6 +115,7 @@ class FailFirstTenCallsFilter {
grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = { grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -35,9 +35,9 @@ const grpc_channel_filter* FilterNamed(const char* name) {
if (it != filters->end()) return it->second; if (it != filters->end()) return it->second;
return filters return filters
->emplace(name, ->emplace(name,
new grpc_channel_filter{nullptr, nullptr, nullptr, 0, nullptr, new grpc_channel_filter{nullptr, nullptr, nullptr, nullptr, 0,
nullptr, nullptr, 0, nullptr, nullptr, nullptr, nullptr, nullptr, 0, nullptr,
nullptr, nullptr, name}) nullptr, nullptr, nullptr, name})
.first->second; .first->second;
} }

@ -199,6 +199,7 @@ class TrailingMetadataRecordingFilter {
grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = { grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch, CallData::StartTransportStreamOpBatch,
nullptr, nullptr,
nullptr,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(CallData), sizeof(CallData),
CallData::Init, CallData::Init,

@ -95,11 +95,11 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) {
grpc_init(); grpc_init();
// Add 2 test filters to XdsChannelStackModifier // Add 2 test filters to XdsChannelStackModifier
const grpc_channel_filter test_filter_1 = { const grpc_channel_filter test_filter_1 = {
nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, 0, nullptr, nullptr,
0, nullptr, nullptr, nullptr, nullptr, kTestFilter1}; nullptr, 0, nullptr, nullptr, nullptr, nullptr, kTestFilter1};
const grpc_channel_filter test_filter_2 = { const grpc_channel_filter test_filter_2 = {
nullptr, nullptr, nullptr, 0, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, 0, nullptr, nullptr,
0, nullptr, nullptr, nullptr, nullptr, kTestFilter2}; nullptr, 0, nullptr, nullptr, nullptr, nullptr, kTestFilter2};
auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>( auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>(
std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2}); std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2});
grpc_arg arg = channel_stack_modifier->MakeChannelArg(); grpc_arg arg = channel_stack_modifier->MakeChannelArg();

Loading…
Cancel
Save