HTTP Client Filter --> promises (#29031)

* Remove idempotent/cacheable requests

* more cleanup

* bump core version

* fix

* fix

* fix

* review feedback

* fixes

* fix

* remove more

* objc

* fix

* fix

* fix

* scrub

* introduce call args

* bs

* x

* Automated change: Fix sanity tests

* fix

* roughing out

* push/pull impl

* comment

* prove out new combinator

* Simplify naming

* fix

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* progress

* builds

* Automated change: Fix sanity tests

* progress

* tweak

* merge

* progress

* fix

* first test passes

* progress

* fix

* repair name

* fix

* small fix

* small fix

* properly stop call combiner

* fix allocation in this benchmark

* fix

* fix

* fix merge

* fix bad merge

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29165/head
Craig Tiller 3 years ago committed by GitHub
parent 6823087eb5
commit 41debbf1a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      build_autogenerated.yaml
  3. 2
      gRPC-C++.podspec
  4. 2
      gRPC-Core.podspec
  5. 1
      grpc.gemspec
  6. 1
      package.xml
  7. 278
      src/core/ext/filters/http/client/http_client_filter.cc
  8. 24
      src/core/ext/filters/http/client/http_client_filter.h
  9. 4
      src/core/ext/filters/http/http_filters_plugin.cc
  10. 576
      src/core/lib/channel/promise_based_filter.cc
  11. 42
      src/core/lib/channel/promise_based_filter.h
  12. 4
      src/core/lib/promise/call_push_pull.h
  13. 3
      src/core/lib/promise/detail/status.h
  14. 2
      src/core/lib/transport/transport.h
  15. 5
      test/cpp/microbenchmarks/bm_call_create.cc
  16. 1
      tools/doxygen/Doxyfile.c++.internal
  17. 1
      tools/doxygen/Doxyfile.core.internal

@ -2746,10 +2746,12 @@ grpc_cc_library(
],
language = "c++",
deps = [
"call_push_pull",
"config",
"gpr_base",
"grpc_base",
"grpc_message_size_filter",
"seq",
"slice",
],
)

@ -815,6 +815,7 @@ libs:
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/call_push_pull.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
@ -1990,6 +1991,7 @@ libs:
- src/core/lib/json/json_util.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/call_push_pull.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h

2
gRPC-C++.podspec generated

@ -783,6 +783,7 @@ Pod::Spec.new do |s|
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
@ -1588,6 +1589,7 @@ Pod::Spec.new do |s|
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',

2
gRPC-Core.podspec generated

@ -1286,6 +1286,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
@ -2186,6 +2187,7 @@ Pod::Spec.new do |s|
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/call_push_pull.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',

1
grpc.gemspec generated

@ -1205,6 +1205,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/activity.cc )
s.files += %w( src/core/lib/promise/activity.h )
s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/call_push_pull.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )
s.files += %w( src/core/lib/promise/detail/promise_factory.h )

1
package.xml generated

@ -1185,6 +1185,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/activity.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/activity.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/arena_promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/call_push_pull.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_factory.h" role="src" />

@ -33,218 +33,60 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/promise/call_push_pull.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport_impl.h"
#define EXPECTED_CONTENT_TYPE "application/grpc"
#define EXPECTED_CONTENT_TYPE_LENGTH (sizeof(EXPECTED_CONTENT_TYPE) - 1)
namespace grpc_core {
static void recv_initial_metadata_ready(void* user_data,
grpc_error_handle error);
static void recv_trailing_metadata_ready(void* user_data,
grpc_error_handle error);
const grpc_channel_filter HttpClientFilter::kFilter =
MakePromiseBasedFilter<HttpClientFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata>("http-client");
namespace {
struct call_data {
call_data(grpc_call_element* elem, const grpc_call_element_args& args)
: call_combiner(args.call_combiner) {
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready,
::recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
::recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
}
~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); }
grpc_core::CallCombiner* call_combiner;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch* recv_initial_metadata;
grpc_error_handle recv_initial_metadata_error = GRPC_ERROR_NONE;
grpc_closure* original_recv_initial_metadata_ready = nullptr;
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* original_recv_trailing_metadata_ready;
grpc_closure recv_trailing_metadata_ready;
grpc_error_handle recv_trailing_metadata_error = GRPC_ERROR_NONE;
bool seen_recv_trailing_metadata_ready = false;
};
struct channel_data {
grpc_core::HttpSchemeMetadata::ValueType static_scheme;
grpc_core::Slice user_agent;
};
} // namespace
static grpc_error_handle client_filter_incoming_metadata(
grpc_metadata_batch* b) {
if (auto* status = b->get_pointer(grpc_core::HttpStatusMetadata())) {
absl::Status CheckServerMetadata(ServerMetadata* b) {
if (auto* status = b->get_pointer(HttpStatusMetadata())) {
/* If both gRPC status and HTTP status are provided in the response, we
* should prefer the gRPC status code, as mentioned in
* https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md.
*/
const grpc_status_code* grpc_status =
b->get_pointer(grpc_core::GrpcStatusMetadata());
const grpc_status_code* grpc_status = b->get_pointer(GrpcStatusMetadata());
if (grpc_status != nullptr || *status == 200) {
b->Remove(grpc_core::HttpStatusMetadata());
b->Remove(HttpStatusMetadata());
} else {
std::string msg =
absl::StrCat("Received http2 header with status: ", *status);
grpc_error_handle e = grpc_error_set_str(
grpc_error_set_int(
grpc_error_set_str(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Received http2 :status header with non-200 OK status"),
GRPC_ERROR_STR_VALUE, std::to_string(*status)),
GRPC_ERROR_INT_GRPC_STATUS,
return absl::Status(
static_cast<absl::StatusCode>(
grpc_http2_status_to_grpc_status(*status)),
GRPC_ERROR_STR_GRPC_MESSAGE, msg);
return e;
absl::StrCat("Received http2 header with status: ", *status));
}
}
if (grpc_core::Slice* grpc_message =
b->get_pointer(grpc_core::GrpcMessageMetadata())) {
*grpc_message =
grpc_core::PermissivePercentDecodeSlice(std::move(*grpc_message));
}
b->Remove(grpc_core::ContentTypeMetadata());
return GRPC_ERROR_NONE;
}
static void recv_initial_metadata_ready(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
error = client_filter_incoming_metadata(calld->recv_initial_metadata);
calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
} else {
(void)GRPC_ERROR_REF(error);
}
grpc_closure* closure = calld->original_recv_initial_metadata_ready;
calld->original_recv_initial_metadata_ready = nullptr;
if (calld->seen_recv_trailing_metadata_ready) {
GRPC_CALL_COMBINER_START(
calld->call_combiner, &calld->recv_trailing_metadata_ready,
calld->recv_trailing_metadata_error, "continue recv_trailing_metadata");
}
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
}
static void recv_trailing_metadata_ready(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->original_recv_initial_metadata_ready != nullptr) {
calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
calld->seen_recv_trailing_metadata_ready = true;
GRPC_CALL_COMBINER_STOP(calld->call_combiner,
"deferring recv_trailing_metadata_ready until "
"after recv_initial_metadata_ready");
return;
}
if (error == GRPC_ERROR_NONE) {
error = client_filter_incoming_metadata(calld->recv_trailing_metadata);
} else {
(void)GRPC_ERROR_REF(error);
}
error = grpc_error_add_child(
error, GRPC_ERROR_REF(calld->recv_initial_metadata_error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, error);
}
static void http_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
GPR_TIMER_SCOPE("http_client_start_transport_stream_op_batch", 0);
if (batch->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata =
batch->payload->recv_initial_metadata.recv_initial_metadata;
calld->original_recv_initial_metadata_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
if (batch->recv_trailing_metadata) {
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
if (batch->send_initial_metadata) {
/* Send : prefixed headers, which have to be before any application
layer headers. */
batch->payload->send_initial_metadata.send_initial_metadata->Set(
grpc_core::HttpMethodMetadata(), grpc_core::HttpMethodMetadata::kPost);
batch->payload->send_initial_metadata.send_initial_metadata->Set(
grpc_core::HttpSchemeMetadata(), channeld->static_scheme);
batch->payload->send_initial_metadata.send_initial_metadata->Set(
grpc_core::TeMetadata(), grpc_core::TeMetadata::kTrailers);
batch->payload->send_initial_metadata.send_initial_metadata->Set(
grpc_core::ContentTypeMetadata(),
grpc_core::ContentTypeMetadata::kApplicationGrpc);
batch->payload->send_initial_metadata.send_initial_metadata->Set(
grpc_core::UserAgentMetadata(), channeld->user_agent.Ref());
if (Slice* grpc_message = b->get_pointer(GrpcMessageMetadata())) {
*grpc_message = PermissivePercentDecodeSlice(std::move(*grpc_message));
}
grpc_call_next_op(elem, batch);
b->Remove(ContentTypeMetadata());
return absl::OkStatus();
}
/* Constructor for call_data */
static grpc_error_handle http_client_init_call_elem(
grpc_call_element* elem, const grpc_call_element_args* args) {
new (elem->call_data) call_data(elem, *args);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void http_client_destroy_call_elem(
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->~call_data();
}
static grpc_core::HttpSchemeMetadata::ValueType scheme_from_args(
const grpc_channel_args* args) {
HttpSchemeMetadata::ValueType SchemeFromArgs(const grpc_channel_args* args) {
if (args != nullptr) {
for (size_t i = 0; i < args->num_args; ++i) {
if (args->args[i].type == GRPC_ARG_STRING &&
0 == strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME)) {
grpc_core::HttpSchemeMetadata::ValueType scheme =
grpc_core::HttpSchemeMetadata::Parse(
args->args[i].value.string,
[](absl::string_view, const grpc_core::Slice&) {});
if (scheme != grpc_core::HttpSchemeMetadata::kInvalid) return scheme;
HttpSchemeMetadata::ValueType scheme = HttpSchemeMetadata::Parse(
args->args[i].value.string, [](absl::string_view, const Slice&) {});
if (scheme != HttpSchemeMetadata::kInvalid) return scheme;
}
}
}
return grpc_core::HttpSchemeMetadata::kHttp;
return HttpSchemeMetadata::kHttp;
}
static grpc_core::Slice user_agent_from_args(const grpc_channel_args* args,
const char* transport_name) {
Slice UserAgentFromArgs(const grpc_channel_args* args,
const char* transport_name) {
std::vector<std::string> user_agent_fields;
for (size_t i = 0; args && i < args->num_args; i++) {
@ -274,39 +116,51 @@ static grpc_core::Slice user_agent_from_args(const grpc_channel_args* args,
}
std::string user_agent_string = absl::StrJoin(user_agent_fields, " ");
return grpc_core::Slice::FromCopiedString(user_agent_string.c_str());
return Slice::FromCopiedString(user_agent_string.c_str());
}
} // namespace
/* Constructor for channel_data */
static grpc_error_handle http_client_init_channel_elem(
grpc_channel_element* elem, grpc_channel_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
GPR_ASSERT(!args->is_last);
auto* transport = grpc_channel_args_find_pointer<grpc_transport>(
args->channel_args, GRPC_ARG_TRANSPORT);
GPR_ASSERT(transport != nullptr);
chand->static_scheme = scheme_from_args(args->channel_args);
chand->user_agent = grpc_core::Slice(
user_agent_from_args(args->channel_args, transport->vtable->name));
return GRPC_ERROR_NONE;
ArenaPromise<ServerMetadataHandle> HttpClientFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto& md = call_args.client_initial_metadata;
md->Set(HttpMethodMetadata(), HttpMethodMetadata::kPost);
md->Set(HttpSchemeMetadata(), scheme_);
md->Set(TeMetadata(), TeMetadata::kTrailers);
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
md->Set(UserAgentMetadata(), user_agent_.Ref());
auto* read_latch = GetContext<Arena>()->New<Latch<ServerMetadata*>>();
auto* write_latch =
absl::exchange(call_args.server_initial_metadata, read_latch);
return CallPushPull(
Seq(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle {
auto r = CheckServerMetadata(md.get());
if (!r.ok()) return ServerMetadataHandle(r);
return md;
}),
[]() { return absl::OkStatus(); },
Seq(read_latch->Wait(),
[write_latch](ServerMetadata** md) -> absl::Status {
auto r =
*md == nullptr ? absl::OkStatus() : CheckServerMetadata(*md);
write_latch->Set(*md);
return r;
}));
}
/* Destructor for channel data */
static void http_client_destroy_channel_elem(grpc_channel_element* elem) {
static_cast<channel_data*>(elem->channel_data)->~channel_data();
HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme,
Slice user_agent)
: scheme_(scheme), user_agent_(std::move(user_agent)) {}
absl::StatusOr<HttpClientFilter> HttpClientFilter::Create(
const grpc_channel_args* args, ChannelFilter::Args) {
auto* transport =
grpc_channel_args_find_pointer<grpc_transport>(args, GRPC_ARG_TRANSPORT);
GPR_ASSERT(transport != nullptr);
return HttpClientFilter(SchemeFromArgs(args),
UserAgentFromArgs(args, transport->vtable->name));
}
const grpc_channel_filter grpc_http_client_filter = {
http_client_start_transport_stream_op_batch,
nullptr,
grpc_channel_next_op,
sizeof(call_data),
http_client_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
http_client_destroy_call_elem,
sizeof(channel_data),
http_client_init_channel_elem,
http_client_destroy_channel_elem,
grpc_channel_next_get_info,
"http-client"};
} // namespace grpc_core

@ -21,8 +21,28 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
/* Processes metadata on the client side for HTTP2 transports */
extern const grpc_channel_filter grpc_http_client_filter;
namespace grpc_core {
class HttpClientFilter : public ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<HttpClientFilter> Create(
const grpc_channel_args* args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
HttpClientFilter(HttpSchemeMetadata::ValueType scheme, Slice user_agent);
HttpSchemeMetadata::ValueType scheme_;
Slice user_agent_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_CLIENT_HTTP_CLIENT_FILTER_H */

@ -83,8 +83,8 @@ void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter);
optional(GRPC_SERVER_CHANNEL, kMinimalStackHasDecompression,
GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter);
required(GRPC_CLIENT_SUBCHANNEL, &grpc_http_client_filter);
required(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_http_client_filter);
required(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter);
required(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter);
required(GRPC_SERVER_CHANNEL, &grpc_http_server_filter);
}
} // namespace grpc_core

@ -16,6 +16,8 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include <cstdlib>
#include "src/core/lib/channel/channel_stack.h"
namespace grpc_core {
@ -24,6 +26,25 @@ namespace promise_filter_detail {
///////////////////////////////////////////////////////////////////////////////
// BaseCallData
BaseCallData::BaseCallData(grpc_call_element* elem,
const grpc_call_element_args* args, uint8_t flags)
: call_stack_(args->call_stack),
elem_(elem),
arena_(args->arena),
call_combiner_(args->call_combiner),
deadline_(args->deadline),
context_(args->context) {
if (flags & kFilterExaminesServerInitialMetadata) {
server_initial_metadata_latch_ = arena_->New<Latch<ServerMetadata*>>();
}
}
BaseCallData::~BaseCallData() {
if (server_initial_metadata_latch_ != nullptr) {
server_initial_metadata_latch_->~Latch();
}
}
// We don't form ActivityPtr's to this type, and consequently don't need
// Orphan().
void BaseCallData::Orphan() { abort(); }
@ -52,23 +73,285 @@ void BaseCallData::Drop() { GRPC_CALL_STACK_UNREF(call_stack_, "waker"); }
///////////////////////////////////////////////////////////////////////////////
// ClientCallData
struct ClientCallData::RecvInitialMetadata final {
enum State {
// Initial state; no op seen
kInitial,
// No op seen, but we have a latch that would like to modify it when we do
kGotLatch,
// Hooked, no latch yet
kHookedWaitingForLatch,
// Hooked, latch seen
kHookedAndGotLatch,
// Got the callback, haven't set latch yet
kCompleteWaitingForLatch,
// Got the callback and got the latch
kCompleteAndGotLatch,
// Got the callback and set the latch
kCompleteAndSetLatch,
// Called the original callback
kResponded,
};
State state = kInitial;
grpc_closure* original_on_ready = nullptr;
grpc_closure on_ready;
grpc_metadata_batch* metadata = nullptr;
Latch<ServerMetadata*>* server_initial_metadata_publisher = nullptr;
};
class ClientCallData::PollContext {
public:
explicit PollContext(ClientCallData* self) : self_(self) {
GPR_ASSERT(self_->poll_ctx_ == nullptr);
self_->poll_ctx_ = this;
scoped_activity_.Init(self_);
have_scoped_activity_ = true;
}
PollContext(const PollContext&) = delete;
PollContext& operator=(const PollContext&) = delete;
void Run() {
GPR_ASSERT(have_scoped_activity_);
repoll_ = false;
if (self_->server_initial_metadata_latch() != nullptr) {
switch (self_->recv_initial_metadata_->state) {
case RecvInitialMetadata::kInitial:
case RecvInitialMetadata::kGotLatch:
case RecvInitialMetadata::kHookedWaitingForLatch:
case RecvInitialMetadata::kHookedAndGotLatch:
case RecvInitialMetadata::kCompleteWaitingForLatch:
case RecvInitialMetadata::kResponded:
break;
case RecvInitialMetadata::kCompleteAndGotLatch:
self_->recv_initial_metadata_->state =
RecvInitialMetadata::kCompleteAndSetLatch;
self_->recv_initial_metadata_->server_initial_metadata_publisher->Set(
self_->recv_initial_metadata_->metadata);
ABSL_FALLTHROUGH_INTENDED;
case RecvInitialMetadata::kCompleteAndSetLatch: {
Poll<ServerMetadata**> p =
self_->server_initial_metadata_latch()->Wait()();
if (ServerMetadata*** ppp = absl::get_if<ServerMetadata**>(&p)) {
ServerMetadata* md = **ppp;
if (self_->recv_initial_metadata_->metadata != md) {
*self_->recv_initial_metadata_->metadata = std::move(*md);
}
self_->recv_initial_metadata_->state =
RecvInitialMetadata::kResponded;
call_closures_.Add(
absl::exchange(self_->recv_initial_metadata_->original_on_ready,
nullptr),
GRPC_ERROR_NONE,
"wake_inside_combiner:recv_initial_metadata_ready");
}
} break;
}
}
if (self_->recv_trailing_state_ == RecvTrailingState::kCancelled ||
self_->recv_trailing_state_ == RecvTrailingState::kResponded) {
return;
}
switch (self_->send_initial_state_) {
case SendInitialState::kQueued:
case SendInitialState::kForwarded: {
// Poll the promise once since we're waiting for it.
Poll<ServerMetadataHandle> poll = self_->promise_();
if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) {
auto* md = UnwrapMetadata(std::move(*r));
bool destroy_md = true;
if (self_->recv_trailing_state_ == RecvTrailingState::kComplete) {
if (self_->recv_trailing_metadata_ != md) {
*self_->recv_trailing_metadata_ = std::move(*md);
} else {
destroy_md = false;
}
self_->recv_trailing_state_ = RecvTrailingState::kResponded;
call_closures_.Add(
absl::exchange(self_->original_recv_trailing_metadata_ready_,
nullptr),
GRPC_ERROR_NONE, "wake_inside_combiner:recv_trailing_ready:1");
} else {
GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) !=
GRPC_STATUS_OK);
grpc_error_handle error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"early return from promise based filter"),
GRPC_ERROR_INT_GRPC_STATUS,
*md->get_pointer(GrpcStatusMetadata()));
if (auto* message = md->get_pointer(GrpcMessageMetadata())) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
message->as_string_view());
}
GRPC_ERROR_UNREF(self_->cancelled_error_);
self_->cancelled_error_ = GRPC_ERROR_REF(error);
if (self_->recv_initial_metadata_ != nullptr) {
switch (self_->recv_initial_metadata_->state) {
case RecvInitialMetadata::kInitial:
case RecvInitialMetadata::kGotLatch:
case RecvInitialMetadata::kHookedWaitingForLatch:
case RecvInitialMetadata::kHookedAndGotLatch:
case RecvInitialMetadata::kResponded:
break;
case RecvInitialMetadata::kCompleteWaitingForLatch:
case RecvInitialMetadata::kCompleteAndGotLatch:
case RecvInitialMetadata::kCompleteAndSetLatch:
self_->recv_initial_metadata_->state =
RecvInitialMetadata::kResponded;
call_closures_.Add(
absl::exchange(
self_->recv_initial_metadata_->original_on_ready,
nullptr),
GRPC_ERROR_REF(error),
"wake_inside_combiner:recv_initial_metadata_ready");
}
}
if (self_->send_initial_state_ == SendInitialState::kQueued) {
self_->send_initial_state_ = SendInitialState::kCancelled;
cancel_send_initial_metadata_error_ = error;
} else {
GPR_ASSERT(
self_->recv_trailing_state_ == RecvTrailingState::kInitial ||
self_->recv_trailing_state_ == RecvTrailingState::kForwarded);
self_->call_combiner()->Cancel(GRPC_ERROR_REF(error));
forward_batch_ =
grpc_make_transport_stream_op(GRPC_CLOSURE_CREATE(
[](void* p, grpc_error_handle) {
GRPC_CALL_COMBINER_STOP(static_cast<CallCombiner*>(p),
"finish_cancel");
},
self_->call_combiner(), nullptr));
forward_batch_->cancel_stream = true;
forward_batch_->payload->cancel_stream.cancel_error = error;
}
self_->recv_trailing_state_ = RecvTrailingState::kCancelled;
}
if (destroy_md) {
md->~grpc_metadata_batch();
}
scoped_activity_.Destroy();
have_scoped_activity_ = false;
self_->promise_ = ArenaPromise<ServerMetadataHandle>();
}
} break;
case SendInitialState::kInitial:
case SendInitialState::kCancelled:
// If we get a response without sending anything, we just propagate
// that up. (note: that situation isn't possible once we finish the
// promise transition).
if (self_->recv_trailing_state_ == RecvTrailingState::kComplete) {
self_->recv_trailing_state_ = RecvTrailingState::kResponded;
call_closures_.Add(
absl::exchange(self_->original_recv_trailing_metadata_ready_,
nullptr),
GRPC_ERROR_NONE, "wake_inside_combiner:recv_trailing_ready:2");
}
break;
}
}
~PollContext() {
self_->poll_ctx_ = nullptr;
if (have_scoped_activity_) scoped_activity_.Destroy();
GRPC_CALL_STACK_REF(self_->call_stack(), "finish_poll");
bool in_combiner = true;
if (call_closures_.size() != 0) {
if (forward_batch_ != nullptr) {
call_closures_.RunClosuresWithoutYielding(self_->call_combiner());
} else {
in_combiner = false;
call_closures_.RunClosures(self_->call_combiner());
}
}
if (forward_batch_ != nullptr) {
GPR_ASSERT(in_combiner);
in_combiner = false;
forward_send_initial_metadata_ = false;
grpc_call_next_op(self_->elem(), forward_batch_);
}
if (cancel_send_initial_metadata_error_ != GRPC_ERROR_NONE) {
GPR_ASSERT(in_combiner);
forward_send_initial_metadata_ = false;
in_combiner = false;
grpc_transport_stream_op_batch_finish_with_failure(
absl::exchange(self_->send_initial_metadata_batch_, nullptr),
cancel_send_initial_metadata_error_, self_->call_combiner());
}
if (absl::exchange(forward_send_initial_metadata_, false)) {
GPR_ASSERT(in_combiner);
in_combiner = false;
grpc_call_next_op(
self_->elem(),
absl::exchange(self_->send_initial_metadata_batch_, nullptr));
}
if (repoll_) {
if (in_combiner) {
self_->WakeInsideCombiner();
} else {
struct NextPoll : public grpc_closure {
grpc_call_stack* call_stack;
ClientCallData* call_data;
};
auto run = [](void* p, grpc_error_handle) {
auto* next_poll = static_cast<NextPoll*>(p);
next_poll->call_data->WakeInsideCombiner();
GRPC_CALL_STACK_UNREF(next_poll->call_stack, "re-poll");
delete next_poll;
};
auto* p = absl::make_unique<NextPoll>().release();
p->call_stack = self_->call_stack();
p->call_data = self_;
GRPC_CALL_STACK_REF(self_->call_stack(), "re-poll");
GRPC_CLOSURE_INIT(p, run, p, nullptr);
GRPC_CALL_COMBINER_START(self_->call_combiner(), p, GRPC_ERROR_NONE,
"re-poll");
}
} else if (in_combiner) {
GRPC_CALL_COMBINER_STOP(self_->call_combiner(), "poll paused");
}
GRPC_CALL_STACK_UNREF(self_->call_stack(), "finish_poll");
}
void Repoll() { repoll_ = true; }
void ForwardSendInitialMetadata() { forward_send_initial_metadata_ = true; }
private:
ManualConstructor<ScopedActivity> scoped_activity_;
ClientCallData* self_;
CallCombinerClosureList call_closures_;
grpc_error_handle cancel_send_initial_metadata_error_ = GRPC_ERROR_NONE;
grpc_transport_stream_op_batch* forward_batch_ = nullptr;
bool repoll_ = false;
bool forward_send_initial_metadata_ = false;
bool have_scoped_activity_;
};
ClientCallData::ClientCallData(grpc_call_element* elem,
const grpc_call_element_args* args)
: BaseCallData(elem, args) {
const grpc_call_element_args* args,
uint8_t flags)
: BaseCallData(elem, args, flags) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyCallback, this,
grpc_schedule_on_exec_ctx);
if (server_initial_metadata_latch() != nullptr) {
recv_initial_metadata_ = arena()->New<RecvInitialMetadata>();
}
}
ClientCallData::~ClientCallData() {
GPR_ASSERT(!is_polling_);
GPR_ASSERT(poll_ctx_ == nullptr);
GRPC_ERROR_UNREF(cancelled_error_);
if (recv_initial_metadata_ != nullptr) {
recv_initial_metadata_->~RecvInitialMetadata();
}
}
// Activity implementation.
void ClientCallData::ForceImmediateRepoll() {
GPR_ASSERT(is_polling_);
repoll_ = true;
GPR_ASSERT(poll_ctx_ != nullptr);
poll_ctx_->Repoll();
}
// Handle one grpc_transport_stream_op_batch
@ -88,6 +371,36 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* batch) {
return;
}
if (recv_initial_metadata_ != nullptr && batch->recv_initial_metadata) {
switch (recv_initial_metadata_->state) {
case RecvInitialMetadata::kInitial:
recv_initial_metadata_->state =
RecvInitialMetadata::kHookedWaitingForLatch;
break;
case RecvInitialMetadata::kGotLatch:
recv_initial_metadata_->state = RecvInitialMetadata::kHookedAndGotLatch;
break;
case RecvInitialMetadata::kHookedWaitingForLatch:
case RecvInitialMetadata::kHookedAndGotLatch:
case RecvInitialMetadata::kCompleteWaitingForLatch:
case RecvInitialMetadata::kCompleteAndGotLatch:
case RecvInitialMetadata::kCompleteAndSetLatch:
case RecvInitialMetadata::kResponded:
abort(); // unreachable
}
auto cb = [](void* ptr, grpc_error_handle error) {
ClientCallData* self = static_cast<ClientCallData*>(ptr);
self->RecvInitialMetadataReady(error);
};
recv_initial_metadata_->metadata =
batch->payload->recv_initial_metadata.recv_initial_metadata;
recv_initial_metadata_->original_on_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
GRPC_CLOSURE_INIT(&recv_initial_metadata_->on_ready, cb, this, nullptr);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&recv_initial_metadata_->on_ready;
}
// send_initial_metadata: seeing this triggers the start of the promise part
// of this filter.
if (batch->send_initial_metadata) {
@ -164,6 +477,25 @@ void ClientCallData::Cancel(grpc_error_handle error) {
} else {
send_initial_state_ = SendInitialState::kCancelled;
}
if (recv_initial_metadata_ != nullptr) {
switch (recv_initial_metadata_->state) {
case RecvInitialMetadata::kCompleteWaitingForLatch:
case RecvInitialMetadata::kCompleteAndGotLatch:
case RecvInitialMetadata::kCompleteAndSetLatch:
recv_initial_metadata_->state = RecvInitialMetadata::kResponded;
GRPC_CALL_COMBINER_START(
call_combiner(),
absl::exchange(recv_initial_metadata_->original_on_ready, nullptr),
GRPC_ERROR_REF(error), "propagate cancellation");
break;
case RecvInitialMetadata::kInitial:
case RecvInitialMetadata::kGotLatch:
case RecvInitialMetadata::kHookedWaitingForLatch:
case RecvInitialMetadata::kHookedAndGotLatch:
case RecvInitialMetadata::kResponded:
break;
}
}
}
// Begin running the promise - which will ultimately take some initial
@ -173,18 +505,48 @@ void ClientCallData::StartPromise() {
ChannelFilter* filter = static_cast<ChannelFilter*>(elem()->channel_data);
// Construct the promise.
{
ScopedActivity activity(this);
promise_ = filter->MakeCallPromise(
CallArgs{
WrapMetadata(send_initial_metadata_batch_->payload
->send_initial_metadata.send_initial_metadata),
nullptr},
[this](CallArgs call_args) {
return MakeNextPromise(std::move(call_args));
});
PollContext ctx(this);
promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(send_initial_metadata_batch_->payload
->send_initial_metadata.send_initial_metadata),
server_initial_metadata_latch()},
[this](CallArgs call_args) {
return MakeNextPromise(std::move(call_args));
});
ctx.Run();
}
void ClientCallData::RecvInitialMetadataReady(grpc_error_handle error) {
ScopedContext context(this);
switch (recv_initial_metadata_->state) {
case RecvInitialMetadata::kHookedWaitingForLatch:
recv_initial_metadata_->state =
RecvInitialMetadata::kCompleteWaitingForLatch;
break;
case RecvInitialMetadata::kHookedAndGotLatch:
recv_initial_metadata_->state = RecvInitialMetadata::kCompleteAndGotLatch;
break;
case RecvInitialMetadata::kInitial:
case RecvInitialMetadata::kGotLatch:
case RecvInitialMetadata::kCompleteWaitingForLatch:
case RecvInitialMetadata::kCompleteAndGotLatch:
case RecvInitialMetadata::kCompleteAndSetLatch:
case RecvInitialMetadata::kResponded:
abort(); // unreachable
}
if (error != GRPC_ERROR_NONE) {
recv_initial_metadata_->state = RecvInitialMetadata::kResponded;
GRPC_CALL_COMBINER_START(
call_combiner(),
absl::exchange(recv_initial_metadata_->original_on_ready, nullptr),
GRPC_ERROR_REF(error), "propagate cancellation");
} else if (send_initial_state_ == SendInitialState::kCancelled) {
recv_initial_metadata_->state = RecvInitialMetadata::kResponded;
GRPC_CALL_COMBINER_START(
call_combiner(),
absl::exchange(recv_initial_metadata_->original_on_ready, nullptr),
GRPC_ERROR_REF(cancelled_error_), "propagate cancellation");
}
// Poll once.
WakeInsideCombiner();
}
@ -207,10 +569,42 @@ void ClientCallData::HookRecvTrailingMetadata(
// - return a wrapper around PollTrailingMetadata as the promise.
ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise(
CallArgs call_args) {
GPR_ASSERT(poll_ctx_ != nullptr);
GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued);
send_initial_metadata_batch_->payload->send_initial_metadata
.send_initial_metadata =
UnwrapMetadata(std::move(call_args.client_initial_metadata));
if (recv_initial_metadata_ != nullptr) {
// Call args should contain a latch for receiving initial metadata.
// It might be the one we passed in - in which case we know this filter
// only wants to examine the metadata, or it might be a new instance, in
// which case we know the filter wants to mutate.
GPR_ASSERT(call_args.server_initial_metadata != nullptr);
recv_initial_metadata_->server_initial_metadata_publisher =
call_args.server_initial_metadata;
switch (recv_initial_metadata_->state) {
case RecvInitialMetadata::kInitial:
recv_initial_metadata_->state = RecvInitialMetadata::kGotLatch;
break;
case RecvInitialMetadata::kHookedWaitingForLatch:
recv_initial_metadata_->state = RecvInitialMetadata::kHookedAndGotLatch;
poll_ctx_->Repoll();
break;
case RecvInitialMetadata::kCompleteWaitingForLatch:
recv_initial_metadata_->state =
RecvInitialMetadata::kCompleteAndGotLatch;
poll_ctx_->Repoll();
break;
case RecvInitialMetadata::kGotLatch:
case RecvInitialMetadata::kHookedAndGotLatch:
case RecvInitialMetadata::kCompleteAndGotLatch:
case RecvInitialMetadata::kCompleteAndSetLatch:
case RecvInitialMetadata::kResponded:
abort(); // unreachable
}
} else {
GPR_ASSERT(call_args.server_initial_metadata == nullptr);
}
return ArenaPromise<ServerMetadataHandle>(
[this]() { return PollTrailingMetadata(); });
}
@ -220,6 +614,7 @@ ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise(
// All polls: await receiving the trailing metadata, then return it to the
// application.
Poll<ServerMetadataHandle> ClientCallData::PollTrailingMetadata() {
GPR_ASSERT(poll_ctx_ != nullptr);
if (send_initial_state_ == SendInitialState::kQueued) {
// First poll: pass the send_initial_metadata op down the stack.
GPR_ASSERT(send_initial_metadata_batch_ != nullptr);
@ -229,7 +624,7 @@ Poll<ServerMetadataHandle> ClientCallData::PollTrailingMetadata() {
HookRecvTrailingMetadata(send_initial_metadata_batch_);
recv_trailing_state_ = RecvTrailingState::kForwarded;
}
forward_send_initial_metadata_ = true;
poll_ctx_->ForwardSendInitialMetadata();
}
switch (recv_trailing_state_) {
case RecvTrailingState::kInitial:
@ -264,6 +659,11 @@ void ClientCallData::RecvTrailingMetadataReadyCallback(
}
void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
if (recv_trailing_state_ == RecvTrailingState::kCancelled) {
Closure::Run(DEBUG_LOCATION, original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(cancelled_error_));
return;
}
// If there was an error, we'll put that into the trailing metadata and
// proceed as if there was not.
if (error != GRPC_ERROR_NONE) {
@ -291,131 +691,7 @@ void ClientCallData::SetStatusFromError(grpc_metadata_batch* metadata,
}
// Wakeup and poll the promise if appropriate.
void ClientCallData::WakeInsideCombiner() {
GPR_ASSERT(!is_polling_);
grpc_closure* call_closure = nullptr;
is_polling_ = true;
grpc_error_handle cancel_send_initial_metadata_error = GRPC_ERROR_NONE;
grpc_transport_stream_op_batch* forward_batch = nullptr;
switch (send_initial_state_) {
case SendInitialState::kQueued:
case SendInitialState::kForwarded: {
// Poll the promise once since we're waiting for it.
Poll<ServerMetadataHandle> poll;
{
ScopedActivity activity(this);
poll = promise_();
}
if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) {
promise_ = ArenaPromise<ServerMetadataHandle>();
auto* md = UnwrapMetadata(std::move(*r));
bool destroy_md = true;
if (recv_trailing_state_ == RecvTrailingState::kComplete) {
if (recv_trailing_metadata_ != md) {
*recv_trailing_metadata_ = std::move(*md);
} else {
destroy_md = false;
}
recv_trailing_state_ = RecvTrailingState::kResponded;
call_closure =
absl::exchange(original_recv_trailing_metadata_ready_, nullptr);
} else {
GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) != GRPC_STATUS_OK);
grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"early return from promise based filter"),
GRPC_ERROR_INT_GRPC_STATUS,
*md->get_pointer(GrpcStatusMetadata()));
if (auto* message = md->get_pointer(GrpcMessageMetadata())) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
message->as_string_view());
}
GRPC_ERROR_UNREF(cancelled_error_);
cancelled_error_ = GRPC_ERROR_REF(error);
if (send_initial_state_ == SendInitialState::kQueued) {
send_initial_state_ = SendInitialState::kCancelled;
cancel_send_initial_metadata_error = error;
} else {
GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kInitial ||
recv_trailing_state_ == RecvTrailingState::kForwarded);
call_combiner()->Cancel(GRPC_ERROR_REF(error));
forward_batch = grpc_make_transport_stream_op(GRPC_CLOSURE_CREATE(
[](void*, grpc_error_handle) {}, nullptr, nullptr));
forward_batch->cancel_stream = true;
forward_batch->payload->cancel_stream.cancel_error = error;
}
recv_trailing_state_ = RecvTrailingState::kCancelled;
}
if (destroy_md) {
md->~grpc_metadata_batch();
}
}
} break;
case SendInitialState::kInitial:
case SendInitialState::kCancelled:
// If we get a response without sending anything, we just propagate
// that up. (note: that situation isn't possible once we finish the
// promise transition).
if (recv_trailing_state_ == RecvTrailingState::kComplete) {
recv_trailing_state_ = RecvTrailingState::kResponded;
call_closure =
absl::exchange(original_recv_trailing_metadata_ready_, nullptr);
}
break;
}
GRPC_CALL_STACK_REF(call_stack(), "finish_poll");
is_polling_ = false;
bool in_combiner = true;
bool repoll = absl::exchange(repoll_, false);
if (forward_batch != nullptr) {
GPR_ASSERT(in_combiner);
in_combiner = false;
forward_send_initial_metadata_ = false;
grpc_call_next_op(elem(), forward_batch);
}
if (cancel_send_initial_metadata_error != GRPC_ERROR_NONE) {
GPR_ASSERT(in_combiner);
forward_send_initial_metadata_ = false;
in_combiner = false;
grpc_transport_stream_op_batch_finish_with_failure(
absl::exchange(send_initial_metadata_batch_, nullptr),
cancel_send_initial_metadata_error, call_combiner());
}
if (absl::exchange(forward_send_initial_metadata_, false)) {
GPR_ASSERT(in_combiner);
in_combiner = false;
grpc_call_next_op(elem(),
absl::exchange(send_initial_metadata_batch_, nullptr));
}
if (call_closure != nullptr) {
GPR_ASSERT(in_combiner);
in_combiner = false;
Closure::Run(DEBUG_LOCATION, call_closure, GRPC_ERROR_NONE);
}
if (repoll) {
if (in_combiner) {
WakeInsideCombiner();
} else {
struct NextPoll : public grpc_closure {
grpc_call_stack* call_stack;
ClientCallData* call_data;
};
auto run = [](void* p, grpc_error_handle) {
auto* next_poll = static_cast<NextPoll*>(p);
next_poll->call_data->WakeInsideCombiner();
GRPC_CALL_STACK_UNREF(next_poll->call_stack, "re-poll");
delete next_poll;
};
auto* p = new NextPoll;
GRPC_CALL_STACK_REF(call_stack(), "re-poll");
GRPC_CLOSURE_INIT(p, run, p, nullptr);
GRPC_CALL_COMBINER_START(call_combiner(), p, GRPC_ERROR_NONE, "re-poll");
}
} else if (in_combiner) {
GRPC_CALL_COMBINER_STOP(call_combiner(), "poll paused");
}
GRPC_CALL_STACK_UNREF(call_stack(), "finish_poll");
}
void ClientCallData::WakeInsideCombiner() { PollContext(this).Run(); }
void ClientCallData::OnWakeup() {
ScopedContext context(this);
@ -426,8 +702,9 @@ void ClientCallData::OnWakeup() {
// ServerCallData
ServerCallData::ServerCallData(grpc_call_element* elem,
const grpc_call_element_args* args)
: BaseCallData(elem, args) {
const grpc_call_element_args* args,
uint8_t flags)
: BaseCallData(elem, args, flags) {
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
RecvInitialMetadataReadyCallback, this,
grpc_schedule_on_exec_ctx);
@ -589,11 +866,12 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
ScopedContext context(this);
// Construct the promise.
ChannelFilter* filter = static_cast<ChannelFilter*>(elem()->channel_data);
promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(recv_initial_metadata_), nullptr},
[this](CallArgs call_args) {
return MakeNextPromise(std::move(call_args));
});
promise_ =
filter->MakeCallPromise(CallArgs{WrapMetadata(recv_initial_metadata_),
server_initial_metadata_latch()},
[this](CallArgs call_args) {
return MakeNextPromise(std::move(call_args));
});
// Poll once.
bool own_error = false;
WakeInsideCombiner([&error, &own_error](grpc_error_handle new_error) {

@ -76,18 +76,17 @@ enum class FilterEndpoint {
kServer,
};
// Flags for MakePromiseBasedFilter.
static constexpr uint8_t kFilterExaminesServerInitialMetadata = 1;
namespace promise_filter_detail {
// Call data shared between all implementations of promise-based filters.
class BaseCallData : public Activity, private Wakeable {
public:
BaseCallData(grpc_call_element* elem, const grpc_call_element_args* args)
: call_stack_(args->call_stack),
elem_(elem),
arena_(args->arena),
call_combiner_(args->call_combiner),
deadline_(args->deadline),
context_(args->context) {}
BaseCallData(grpc_call_element* elem, const grpc_call_element_args* args,
uint8_t flags);
~BaseCallData() override;
void set_pollent(grpc_polling_entity* pollent) {
GPR_ASSERT(nullptr ==
@ -130,10 +129,14 @@ class BaseCallData : public Activity, private Wakeable {
return p.Unwrap();
}
Arena* arena() { return arena_; }
grpc_call_element* elem() const { return elem_; }
CallCombiner* call_combiner() const { return call_combiner_; }
Timestamp deadline() const { return deadline_; }
grpc_call_stack* call_stack() const { return call_stack_; }
Latch<ServerMetadata*>* server_initial_metadata_latch() const {
return server_initial_metadata_latch_;
}
private:
// Wakeable implementation.
@ -150,11 +153,13 @@ class BaseCallData : public Activity, private Wakeable {
CallFinalization finalization_;
grpc_call_context_element* const context_;
std::atomic<grpc_polling_entity*> pollent_{nullptr};
Latch<ServerMetadata*>* server_initial_metadata_latch_ = nullptr;
};
class ClientCallData : public BaseCallData {
public:
ClientCallData(grpc_call_element* elem, const grpc_call_element_args* args);
ClientCallData(grpc_call_element* elem, const grpc_call_element_args* args,
uint8_t flags);
~ClientCallData() override;
// Activity implementation.
@ -195,6 +200,9 @@ class ClientCallData : public BaseCallData {
kCancelled
};
struct RecvInitialMetadata;
class PollContext;
// Handle cancellation.
void Cancel(grpc_error_handle error);
// Begin running the promise - which will ultimately take some initial
@ -217,6 +225,7 @@ class ClientCallData : public BaseCallData {
static void RecvTrailingMetadataReadyCallback(void* arg,
grpc_error_handle error);
void RecvTrailingMetadataReady(grpc_error_handle error);
void RecvInitialMetadataReady(grpc_error_handle error);
// Given an error, fill in ServerMetadataHandle to represent that error.
void SetStatusFromError(grpc_metadata_batch* metadata,
grpc_error_handle error);
@ -230,6 +239,8 @@ class ClientCallData : public BaseCallData {
grpc_transport_stream_op_batch* send_initial_metadata_batch_ = nullptr;
// Pointer to where trailing metadata will be stored.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
// State tracking recv initial metadata for filters that care about it.
RecvInitialMetadata* recv_initial_metadata_ = nullptr;
// Closure to call when we're done with the trailing metadata.
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
// Our closure pointing to RecvTrailingMetadataReadyCallback.
@ -240,17 +251,14 @@ class ClientCallData : public BaseCallData {
SendInitialState send_initial_state_ = SendInitialState::kInitial;
// State of the recv_trailing_metadata op.
RecvTrailingState recv_trailing_state_ = RecvTrailingState::kInitial;
// Whether we're currently polling the promise.
bool is_polling_ = false;
// Should we repoll after completing polling?
bool repoll_ = false;
// Whether we should forward send initial metadata after polling?
bool forward_send_initial_metadata_ = false;
// Polling related data. Non-null if we're actively polling
PollContext* poll_ctx_ = nullptr;
};
class ServerCallData : public BaseCallData {
public:
ServerCallData(grpc_call_element* elem, const grpc_call_element_args* args);
ServerCallData(grpc_call_element* elem, const grpc_call_element_args* args,
uint8_t flags);
~ServerCallData() override;
// Activity implementation.
@ -356,7 +364,7 @@ class CallData<ChannelFilter, FilterEndpoint::kServer> : public ServerCallData {
// };
// TODO(ctiller): allow implementing get_channel_info, start_transport_op in
// some way on ChannelFilter.
template <typename F, FilterEndpoint kEndpoint>
template <typename F, FilterEndpoint kEndpoint, uint8_t kFlags = 0>
absl::enable_if_t<std::is_base_of<ChannelFilter, F>::value, grpc_channel_filter>
MakePromiseBasedFilter(const char* name) {
using CallData = promise_filter_detail::CallData<F, kEndpoint>;
@ -383,7 +391,7 @@ MakePromiseBasedFilter(const char* name) {
sizeof(CallData),
// init_call_elem
[](grpc_call_element* elem, const grpc_call_element_args* args) {
new (elem->call_data) CallData(elem, args);
new (elem->call_data) CallData(elem, args, kFlags);
return GRPC_ERROR_NONE;
},
// set_pollset_or_pollset_set

@ -75,7 +75,7 @@ class CallPushPull {
if (IsStatusOk(*status)) {
done_.set(kDonePush);
} else {
return std::move(*status);
return Result(std::move(*status));
}
}
}
@ -97,7 +97,7 @@ class CallPushPull {
if (IsStatusOk(*status)) {
done_.set(kDonePull);
} else {
return std::move(*status);
return Result(std::move(*status));
}
}
}

@ -39,11 +39,12 @@ inline absl::Status IntoStatus(absl::Status* status) {
}
} // namespace promise_detail
} // namespace grpc_core
// Return true if the status represented by the argument is ok, false if not.
// By implementing this function for other, non-absl::Status types, those types
// can participate in TrySeq as result types that affect control flow.
inline bool IsStatusOk(const absl::Status& status) { return status.ok(); }
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_DETAIL_STATUS_H

@ -98,7 +98,7 @@ class MetadataHandle {
T* handle_ = nullptr;
};
// Trailing metadata type
// Server metadata type
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
using ServerMetadata = grpc_metadata_batch;
using ServerMetadataHandle = MetadataHandle<ServerMetadata>;

@ -561,7 +561,7 @@ static void BM_IsolatedFilter(benchmark::State& state) {
grpc_slice method = grpc_slice_from_static_string("/foo/bar");
grpc_call_final_info final_info;
TestOp test_op_data;
const int kArenaSize = 4096;
const int kArenaSize = 32 * 1024 * 1024;
grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
grpc_call_element_args call_args{
call_stack,
@ -617,7 +617,8 @@ typedef Fixture<&grpc_server_deadline_filter, CHECKS_NOT_LAST>
ServerDeadlineFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerDeadlineFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerDeadlineFilter, SendEmptyMetadata);
typedef Fixture<&grpc_http_client_filter, CHECKS_NOT_LAST | REQUIRES_TRANSPORT>
typedef Fixture<&grpc_core::HttpClientFilter::kFilter,
CHECKS_NOT_LAST | REQUIRES_TRANSPORT>
HttpClientFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpClientFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpClientFilter, SendEmptyMetadata);

@ -2184,6 +2184,7 @@ src/core/lib/profiling/timers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/call_push_pull.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \

@ -1979,6 +1979,7 @@ src/core/lib/profiling/timers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/call_push_pull.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \

Loading…
Cancel
Save