pull/32347/head
Craig Tiller 2 years ago
parent dd3dea9fb5
commit 1e88193edd
  1. 23
      src/core/lib/surface/call.cc

@ -2244,8 +2244,7 @@ class PromiseBasedCall : public Call,
void StartRecvMessage(const grpc_op& op, const Completion& completion,
PipeReceiver<MessageHandle>* receiver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void PollRecvMessage(grpc_compression_algorithm compression_algorithm)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void PollRecvMessage() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void CancelRecvMessage(SourceLocation = {})
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void StartSendMessage(const grpc_op& op, const Completion& completion,
@ -2662,8 +2661,7 @@ void PromiseBasedCall::StartRecvMessage(const grpc_op& op,
outstanding_recv_.emplace(receiver->Next());
}
void PromiseBasedCall::PollRecvMessage(
grpc_compression_algorithm incoming_compression_algorithm) {
void PromiseBasedCall::PollRecvMessage() {
if (!outstanding_recv_.has_value()) return;
Poll<NextResult<MessageHandle>> r = (*outstanding_recv_)();
if (auto* result = absl::get_if<NextResult<MessageHandle>>(&r)) {
@ -2672,9 +2670,9 @@ void PromiseBasedCall::PollRecvMessage(
MessageHandle& message = **result;
NoteLastMessageFlags(message->flags());
if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
(incoming_compression_algorithm != GRPC_COMPRESS_NONE)) {
(incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
*recv_message_ = grpc_raw_compressed_byte_buffer_create(
nullptr, 0, incoming_compression_algorithm);
nullptr, 0, incoming_compression_algorithm());
} else {
*recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
}
@ -3086,14 +3084,14 @@ void ClientPromiseBasedCall::UpdateOnce() {
}
if (auto* result = absl::get_if<ServerMetadataHandle>(&r)) {
if (!server_initial_metadata_ready_.has_value()) {
PollRecvMessage(incoming_compression_algorithm());
PollRecvMessage();
}
AcceptTransportStatsFromContext();
Finish(std::move(*result));
}
}
if (!server_initial_metadata_ready_.has_value()) {
PollRecvMessage(incoming_compression_algorithm());
PollRecvMessage();
}
}
@ -3348,8 +3346,6 @@ class ServerPromiseBasedCall final : public PromiseBasedCall {
SendInitialMetadataState send_initial_metadata_state_ ABSL_GUARDED_BY(mu()) =
absl::monostate{};
ServerMetadataHandle send_trailing_metadata_ ABSL_GUARDED_BY(mu());
grpc_compression_algorithm incoming_compression_algorithm_
ABSL_GUARDED_BY(mu());
bool force_metadata_send_ ABSL_GUARDED_BY(mu()) = false;
RecvCloseOpCancelState recv_close_op_cancel_state_ ABSL_GUARDED_BY(mu());
Completion recv_close_completion_ ABSL_GUARDED_BY(mu());
@ -3393,7 +3389,7 @@ Poll<ServerMetadataHandle> ServerPromiseBasedCall::PollTopOfCall() {
}
PollSendMessage();
PollRecvMessage(incoming_compression_algorithm_);
PollRecvMessage();
if (force_metadata_send_) GPR_ASSERT(!is_sending());
@ -3446,7 +3442,7 @@ void ServerPromiseBasedCall::UpdateOnce() {
}
if (auto* result = absl::get_if<ServerMetadataHandle>(&r)) {
if (!(*result)->get(GrpcCallWasCancelled()).value_or(false)) {
PollRecvMessage(incoming_compression_algorithm_);
PollRecvMessage();
}
Finish(std::move(*result));
promise_ = ArenaPromise<ServerMetadataHandle>();
@ -3663,9 +3659,6 @@ ServerCallContext::MakeTopOfServerCallPromise(
call_->server_to_client_messages_ = call_args.server_to_client_messages;
call_->client_to_server_messages_ = call_args.client_to_server_messages;
call_->send_initial_metadata_state_ = call_args.server_initial_metadata;
call_->incoming_compression_algorithm_ =
call_args.client_initial_metadata->get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE);
call_->client_initial_metadata_ =
std::move(call_args.client_initial_metadata);
call_->ProcessIncomingInitialMetadata(

Loading…
Cancel
Save