|
|
@ -136,7 +136,7 @@ void ClientCallData::Cancel(grpc_error_handle error) { |
|
|
|
GRPC_ERROR_UNREF(cancelled_error_); |
|
|
|
GRPC_ERROR_UNREF(cancelled_error_); |
|
|
|
cancelled_error_ = GRPC_ERROR_REF(error); |
|
|
|
cancelled_error_ = GRPC_ERROR_REF(error); |
|
|
|
// Stop running the promise.
|
|
|
|
// Stop running the promise.
|
|
|
|
promise_ = ArenaPromise<TrailingMetadata>(); |
|
|
|
promise_ = ArenaPromise<ServerMetadataHandle>(); |
|
|
|
// If we have an op queued, fail that op.
|
|
|
|
// If we have an op queued, fail that op.
|
|
|
|
// Record what we've done.
|
|
|
|
// Record what we've done.
|
|
|
|
if (send_initial_state_ == SendInitialState::kQueued) { |
|
|
|
if (send_initial_state_ == SendInitialState::kQueued) { |
|
|
@ -176,10 +176,12 @@ void ClientCallData::StartPromise() { |
|
|
|
{ |
|
|
|
{ |
|
|
|
ScopedActivity activity(this); |
|
|
|
ScopedActivity activity(this); |
|
|
|
promise_ = filter->MakeCallPromise( |
|
|
|
promise_ = filter->MakeCallPromise( |
|
|
|
WrapMetadata(send_initial_metadata_batch_->payload |
|
|
|
CallArgs{ |
|
|
|
->send_initial_metadata.send_initial_metadata), |
|
|
|
WrapMetadata(send_initial_metadata_batch_->payload |
|
|
|
[this](ClientInitialMetadata initial_metadata) { |
|
|
|
->send_initial_metadata.send_initial_metadata), |
|
|
|
return MakeNextPromise(std::move(initial_metadata)); |
|
|
|
nullptr}, |
|
|
|
|
|
|
|
[this](CallArgs call_args) { |
|
|
|
|
|
|
|
return MakeNextPromise(std::move(call_args)); |
|
|
|
}); |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
// Poll once.
|
|
|
|
// Poll once.
|
|
|
@ -203,12 +205,13 @@ void ClientCallData::HookRecvTrailingMetadata( |
|
|
|
// Effectively:
|
|
|
|
// Effectively:
|
|
|
|
// - put the modified initial metadata into the batch to be sent down.
|
|
|
|
// - put the modified initial metadata into the batch to be sent down.
|
|
|
|
// - return a wrapper around PollTrailingMetadata as the promise.
|
|
|
|
// - return a wrapper around PollTrailingMetadata as the promise.
|
|
|
|
ArenaPromise<TrailingMetadata> ClientCallData::MakeNextPromise( |
|
|
|
ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise( |
|
|
|
ClientInitialMetadata initial_metadata) { |
|
|
|
CallArgs call_args) { |
|
|
|
GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued); |
|
|
|
GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued); |
|
|
|
send_initial_metadata_batch_->payload->send_initial_metadata |
|
|
|
send_initial_metadata_batch_->payload->send_initial_metadata |
|
|
|
.send_initial_metadata = UnwrapMetadata(std::move(initial_metadata)); |
|
|
|
.send_initial_metadata = |
|
|
|
return ArenaPromise<TrailingMetadata>( |
|
|
|
UnwrapMetadata(std::move(call_args.client_initial_metadata)); |
|
|
|
|
|
|
|
return ArenaPromise<ServerMetadataHandle>( |
|
|
|
[this]() { return PollTrailingMetadata(); }); |
|
|
|
[this]() { return PollTrailingMetadata(); }); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -216,7 +219,7 @@ ArenaPromise<TrailingMetadata> ClientCallData::MakeNextPromise( |
|
|
|
// First poll: send the send_initial_metadata op down the stack.
|
|
|
|
// First poll: send the send_initial_metadata op down the stack.
|
|
|
|
// All polls: await receiving the trailing metadata, then return it to the
|
|
|
|
// All polls: await receiving the trailing metadata, then return it to the
|
|
|
|
// application.
|
|
|
|
// application.
|
|
|
|
Poll<TrailingMetadata> ClientCallData::PollTrailingMetadata() { |
|
|
|
Poll<ServerMetadataHandle> ClientCallData::PollTrailingMetadata() { |
|
|
|
if (send_initial_state_ == SendInitialState::kQueued) { |
|
|
|
if (send_initial_state_ == SendInitialState::kQueued) { |
|
|
|
// First poll: pass the send_initial_metadata op down the stack.
|
|
|
|
// First poll: pass the send_initial_metadata op down the stack.
|
|
|
|
GPR_ASSERT(send_initial_metadata_batch_ != nullptr); |
|
|
|
GPR_ASSERT(send_initial_metadata_batch_ != nullptr); |
|
|
@ -274,7 +277,7 @@ void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) { |
|
|
|
WakeInsideCombiner(); |
|
|
|
WakeInsideCombiner(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Given an error, fill in TrailingMetadata to represent that error.
|
|
|
|
// Given an error, fill in ServerMetadataHandle to represent that error.
|
|
|
|
void ClientCallData::SetStatusFromError(grpc_metadata_batch* metadata, |
|
|
|
void ClientCallData::SetStatusFromError(grpc_metadata_batch* metadata, |
|
|
|
grpc_error_handle error) { |
|
|
|
grpc_error_handle error) { |
|
|
|
grpc_status_code status_code = GRPC_STATUS_UNKNOWN; |
|
|
|
grpc_status_code status_code = GRPC_STATUS_UNKNOWN; |
|
|
@ -298,13 +301,13 @@ void ClientCallData::WakeInsideCombiner() { |
|
|
|
case SendInitialState::kQueued: |
|
|
|
case SendInitialState::kQueued: |
|
|
|
case SendInitialState::kForwarded: { |
|
|
|
case SendInitialState::kForwarded: { |
|
|
|
// Poll the promise once since we're waiting for it.
|
|
|
|
// Poll the promise once since we're waiting for it.
|
|
|
|
Poll<TrailingMetadata> poll; |
|
|
|
Poll<ServerMetadataHandle> poll; |
|
|
|
{ |
|
|
|
{ |
|
|
|
ScopedActivity activity(this); |
|
|
|
ScopedActivity activity(this); |
|
|
|
poll = promise_(); |
|
|
|
poll = promise_(); |
|
|
|
} |
|
|
|
} |
|
|
|
if (auto* r = absl::get_if<TrailingMetadata>(&poll)) { |
|
|
|
if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) { |
|
|
|
promise_ = ArenaPromise<TrailingMetadata>(); |
|
|
|
promise_ = ArenaPromise<ServerMetadataHandle>(); |
|
|
|
auto* md = UnwrapMetadata(std::move(*r)); |
|
|
|
auto* md = UnwrapMetadata(std::move(*r)); |
|
|
|
bool destroy_md = true; |
|
|
|
bool destroy_md = true; |
|
|
|
if (recv_trailing_state_ == RecvTrailingState::kComplete) { |
|
|
|
if (recv_trailing_state_ == RecvTrailingState::kComplete) { |
|
|
@ -505,7 +508,7 @@ void ServerCallData::Cancel(grpc_error_handle error) { |
|
|
|
GRPC_ERROR_UNREF(cancelled_error_); |
|
|
|
GRPC_ERROR_UNREF(cancelled_error_); |
|
|
|
cancelled_error_ = GRPC_ERROR_REF(error); |
|
|
|
cancelled_error_ = GRPC_ERROR_REF(error); |
|
|
|
// Stop running the promise.
|
|
|
|
// Stop running the promise.
|
|
|
|
promise_ = ArenaPromise<TrailingMetadata>(); |
|
|
|
promise_ = ArenaPromise<ServerMetadataHandle>(); |
|
|
|
if (send_trailing_state_ == SendTrailingState::kQueued) { |
|
|
|
if (send_trailing_state_ == SendTrailingState::kQueued) { |
|
|
|
send_trailing_state_ = SendTrailingState::kCancelled; |
|
|
|
send_trailing_state_ = SendTrailingState::kCancelled; |
|
|
|
struct FailBatch : public grpc_closure { |
|
|
|
struct FailBatch : public grpc_closure { |
|
|
@ -534,20 +537,20 @@ void ServerCallData::Cancel(grpc_error_handle error) { |
|
|
|
// Effectively:
|
|
|
|
// Effectively:
|
|
|
|
// - put the modified initial metadata into the batch being sent up.
|
|
|
|
// - put the modified initial metadata into the batch being sent up.
|
|
|
|
// - return a wrapper around PollTrailingMetadata as the promise.
|
|
|
|
// - return a wrapper around PollTrailingMetadata as the promise.
|
|
|
|
ArenaPromise<TrailingMetadata> ServerCallData::MakeNextPromise( |
|
|
|
ArenaPromise<ServerMetadataHandle> ServerCallData::MakeNextPromise( |
|
|
|
ClientInitialMetadata initial_metadata) { |
|
|
|
CallArgs call_args) { |
|
|
|
GPR_ASSERT(recv_initial_state_ == RecvInitialState::kComplete); |
|
|
|
GPR_ASSERT(recv_initial_state_ == RecvInitialState::kComplete); |
|
|
|
GPR_ASSERT(UnwrapMetadata(std::move(initial_metadata)) == |
|
|
|
GPR_ASSERT(UnwrapMetadata(std::move(call_args.client_initial_metadata)) == |
|
|
|
recv_initial_metadata_); |
|
|
|
recv_initial_metadata_); |
|
|
|
forward_recv_initial_metadata_callback_ = true; |
|
|
|
forward_recv_initial_metadata_callback_ = true; |
|
|
|
return ArenaPromise<TrailingMetadata>( |
|
|
|
return ArenaPromise<ServerMetadataHandle>( |
|
|
|
[this]() { return PollTrailingMetadata(); }); |
|
|
|
[this]() { return PollTrailingMetadata(); }); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Wrapper to make it look like we're calling the next filter as a promise.
|
|
|
|
// Wrapper to make it look like we're calling the next filter as a promise.
|
|
|
|
// All polls: await sending the trailing metadata, then foward it down the
|
|
|
|
// All polls: await sending the trailing metadata, then foward it down the
|
|
|
|
// stack.
|
|
|
|
// stack.
|
|
|
|
Poll<TrailingMetadata> ServerCallData::PollTrailingMetadata() { |
|
|
|
Poll<ServerMetadataHandle> ServerCallData::PollTrailingMetadata() { |
|
|
|
switch (send_trailing_state_) { |
|
|
|
switch (send_trailing_state_) { |
|
|
|
case SendTrailingState::kInitial: |
|
|
|
case SendTrailingState::kInitial: |
|
|
|
return Pending{}; |
|
|
|
return Pending{}; |
|
|
@ -587,9 +590,9 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) { |
|
|
|
// Construct the promise.
|
|
|
|
// Construct the promise.
|
|
|
|
ChannelFilter* filter = static_cast<ChannelFilter*>(elem()->channel_data); |
|
|
|
ChannelFilter* filter = static_cast<ChannelFilter*>(elem()->channel_data); |
|
|
|
promise_ = filter->MakeCallPromise( |
|
|
|
promise_ = filter->MakeCallPromise( |
|
|
|
WrapMetadata(recv_initial_metadata_), |
|
|
|
CallArgs{WrapMetadata(recv_initial_metadata_), nullptr}, |
|
|
|
[this](ClientInitialMetadata initial_metadata) { |
|
|
|
[this](CallArgs call_args) { |
|
|
|
return MakeNextPromise(std::move(initial_metadata)); |
|
|
|
return MakeNextPromise(std::move(call_args)); |
|
|
|
}); |
|
|
|
}); |
|
|
|
// Poll once.
|
|
|
|
// Poll once.
|
|
|
|
bool own_error = false; |
|
|
|
bool own_error = false; |
|
|
@ -610,12 +613,12 @@ void ServerCallData::WakeInsideCombiner( |
|
|
|
bool forward_send_trailing_metadata = false; |
|
|
|
bool forward_send_trailing_metadata = false; |
|
|
|
is_polling_ = true; |
|
|
|
is_polling_ = true; |
|
|
|
if (recv_initial_state_ == RecvInitialState::kComplete) { |
|
|
|
if (recv_initial_state_ == RecvInitialState::kComplete) { |
|
|
|
Poll<TrailingMetadata> poll; |
|
|
|
Poll<ServerMetadataHandle> poll; |
|
|
|
{ |
|
|
|
{ |
|
|
|
ScopedActivity activity(this); |
|
|
|
ScopedActivity activity(this); |
|
|
|
poll = promise_(); |
|
|
|
poll = promise_(); |
|
|
|
} |
|
|
|
} |
|
|
|
if (auto* r = absl::get_if<TrailingMetadata>(&poll)) { |
|
|
|
if (auto* r = absl::get_if<ServerMetadataHandle>(&poll)) { |
|
|
|
auto* md = UnwrapMetadata(std::move(*r)); |
|
|
|
auto* md = UnwrapMetadata(std::move(*r)); |
|
|
|
bool destroy_md = true; |
|
|
|
bool destroy_md = true; |
|
|
|
switch (send_trailing_state_) { |
|
|
|
switch (send_trailing_state_) { |
|
|
|