|
|
|
@ -207,10 +207,8 @@ void BaseCallData::CapturedBatch::ResumeWith(Flusher* releaser) { |
|
|
|
|
uintptr_t& refcnt = *RefCountField(batch); |
|
|
|
|
if (refcnt == 0) { |
|
|
|
|
// refcnt==0 ==> cancelled
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << releaser->call()->DebugTag() |
|
|
|
|
<< "RESUME BATCH REQUEST CANCELLED"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << releaser->call()->DebugTag() < < < < |
|
|
|
|
"RESUME BATCH REQUEST CANCELLED"; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (--refcnt == 0) { |
|
|
|
@ -266,10 +264,9 @@ BaseCallData::Flusher::~Flusher() { |
|
|
|
|
auto* batch = static_cast<grpc_transport_stream_op_batch*>(p); |
|
|
|
|
BaseCallData* call = |
|
|
|
|
static_cast<BaseCallData*>(batch->handler_private.extra_arg); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << "FLUSHER:forward batch via closure: " |
|
|
|
|
<< grpc_transport_stream_op_batch_string(batch, false); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< "FLUSHER:forward batch via closure: " < < < < |
|
|
|
|
grpc_transport_stream_op_batch_string(batch, false); |
|
|
|
|
grpc_call_next_op(call->elem(), batch); |
|
|
|
|
GRPC_CALL_STACK_UNREF(call->call_stack(), "flusher_batch"); |
|
|
|
|
}; |
|
|
|
@ -278,10 +275,9 @@ BaseCallData::Flusher::~Flusher() { |
|
|
|
|
if (call_->call() != nullptr && call_->call()->traced()) { |
|
|
|
|
batch->is_traced = true; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << "FLUSHER:queue batch to forward in closure: " |
|
|
|
|
<< grpc_transport_stream_op_batch_string(release_[i], false); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< "FLUSHER:queue batch to forward in closure: " < < < < |
|
|
|
|
grpc_transport_stream_op_batch_string(release_[i], false); |
|
|
|
|
batch->handler_private.extra_arg = call_; |
|
|
|
|
GRPC_CLOSURE_INIT(&batch->handler_private.closure, call_next_op, batch, |
|
|
|
|
nullptr); |
|
|
|
@ -290,10 +286,8 @@ BaseCallData::Flusher::~Flusher() { |
|
|
|
|
"flusher_batch"); |
|
|
|
|
} |
|
|
|
|
call_closures_.RunClosuresWithoutYielding(call_->call_combiner()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << "FLUSHER:forward batch: " |
|
|
|
|
<< grpc_transport_stream_op_batch_string(release_[0], false); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << "FLUSHER:forward batch: " < < < < |
|
|
|
|
grpc_transport_stream_op_batch_string(release_[0], false); |
|
|
|
|
if (call_->call() != nullptr && call_->call()->traced()) { |
|
|
|
|
release_[0]->is_traced = true; |
|
|
|
|
} |
|
|
|
@ -331,10 +325,8 @@ const char* BaseCallData::SendMessage::StateString(State state) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BaseCallData::SendMessage::StartOp(CapturedBatch batch) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << base_->LogTag() |
|
|
|
|
<< " SendMessage.StartOp st=" << StateString(state_); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" SendMessage.StartOp st=" << StateString(state_); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
|
state_ = State::kGotBatchNoPipe; |
|
|
|
@ -359,10 +351,8 @@ void BaseCallData::SendMessage::StartOp(CapturedBatch batch) { |
|
|
|
|
|
|
|
|
|
template <typename T> |
|
|
|
|
void BaseCallData::SendMessage::GotPipe(T* pipe_end) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << base_->LogTag() |
|
|
|
|
<< " SendMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" SendMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
CHECK_NE(pipe_end, nullptr); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
@ -615,10 +605,8 @@ const char* BaseCallData::ReceiveMessage::StateString(State state) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << base_->LogTag() |
|
|
|
|
<< " ReceiveMessage.StartOp st=" << StateString(state_); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" ReceiveMessage.StartOp st=" << StateString(state_); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
|
state_ = State::kForwardedBatchNoPipe; |
|
|
|
@ -656,10 +644,8 @@ void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { |
|
|
|
|
|
|
|
|
|
template <typename T> |
|
|
|
|
void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << base_->LogTag() |
|
|
|
|
<< " ReceiveMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" ReceiveMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
|
state_ = State::kIdle; |
|
|
|
@ -901,10 +887,8 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, |
|
|
|
|
case State::kPulledFromPipe: { |
|
|
|
|
CHECK(push_.has_value()); |
|
|
|
|
if ((*push_)().ready()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << base_->LogTag() |
|
|
|
|
<< " ReceiveMessage.WakeInsideCombiner push complete"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" ReceiveMessage.WakeInsideCombiner push complete"; |
|
|
|
|
if (state_ == State::kCompletedWhilePulledFromPipe) { |
|
|
|
|
interceptor()->Push()->Close(); |
|
|
|
|
state_ = State::kCancelled; |
|
|
|
@ -1016,10 +1000,9 @@ class ClientCallData::PollContext { |
|
|
|
|
|
|
|
|
|
void Run() { |
|
|
|
|
DCHECK(HasContext<Arena>()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << self_->LogTag() << " ClientCallData.PollContext.Run " |
|
|
|
|
<< self_->DebugString(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< self_->LogTag() << " ClientCallData.PollContext.Run " < < < < |
|
|
|
|
self_->DebugString(); |
|
|
|
|
CHECK(have_scoped_activity_); |
|
|
|
|
repoll_ = false; |
|
|
|
|
if (self_->send_message() != nullptr) { |
|
|
|
@ -1664,10 +1647,9 @@ void ClientCallData::HookRecvTrailingMetadata(CapturedBatch batch) { |
|
|
|
|
// - return a wrapper around PollTrailingMetadata as the promise.
|
|
|
|
|
ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise( |
|
|
|
|
CallArgs call_args) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << LogTag() << " ClientCallData.MakeNextPromise " |
|
|
|
|
<< DebugString(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() << " ClientCallData.MakeNextPromise " < < < < |
|
|
|
|
DebugString(); |
|
|
|
|
CHECK_NE(poll_ctx_, nullptr); |
|
|
|
|
CHECK(send_initial_state_ == SendInitialState::kQueued); |
|
|
|
|
send_initial_metadata_batch_->payload->send_initial_metadata |
|
|
|
@ -1727,10 +1709,9 @@ ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise( |
|
|
|
|
// All polls: await receiving the trailing metadata, then return it to the
|
|
|
|
|
// application.
|
|
|
|
|
Poll<ServerMetadataHandle> ClientCallData::PollTrailingMetadata() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << LogTag() << " ClientCallData.PollTrailingMetadata " |
|
|
|
|
<< DebugString(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() << " ClientCallData.PollTrailingMetadata " < < < < |
|
|
|
|
DebugString(); |
|
|
|
|
CHECK_NE(poll_ctx_, nullptr); |
|
|
|
|
if (send_initial_state_ == SendInitialState::kQueued) { |
|
|
|
|
// First poll: pass the send_initial_metadata op down the stack.
|
|
|
|
@ -2275,10 +2256,8 @@ ArenaPromise<ServerMetadataHandle> ServerCallData::MakeNextPromise( |
|
|
|
|
// All polls: await sending the trailing metadata, then foward it down the
|
|
|
|
|
// stack.
|
|
|
|
|
Poll<ServerMetadataHandle> ServerCallData::PollTrailingMetadata() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << LogTag() |
|
|
|
|
<< " PollTrailingMetadata: " << StateString(send_trailing_state_); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << LogTag() < < < < |
|
|
|
|
" PollTrailingMetadata: " << StateString(send_trailing_state_); |
|
|
|
|
switch (send_trailing_state_) { |
|
|
|
|
case SendTrailingState::kInitial: |
|
|
|
|
case SendTrailingState::kQueuedBehindSendMessage: |
|
|
|
@ -2306,10 +2285,9 @@ void ServerCallData::RecvTrailingMetadataReadyCallback( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(channel)) { |
|
|
|
|
LOG(INFO) << LogTag() << ": RecvTrailingMetadataReady error=" << error |
|
|
|
|
<< " md=" << recv_trailing_metadata_->DebugString(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() << ": RecvTrailingMetadataReady error=" << error < < < < |
|
|
|
|
" md=" << recv_trailing_metadata_->DebugString(); |
|
|
|
|
Flusher flusher(this); |
|
|
|
|
PollContext poll_ctx(this, &flusher); |
|
|
|
|
Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(), |
|
|
|
|