|
|
|
@ -207,8 +207,8 @@ void BaseCallData::CapturedBatch::ResumeWith(Flusher* releaser) { |
|
|
|
|
uintptr_t& refcnt = *RefCountField(batch); |
|
|
|
|
if (refcnt == 0) { |
|
|
|
|
// refcnt==0 ==> cancelled
|
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << releaser->call()->DebugTag() < < < < |
|
|
|
|
"RESUME BATCH REQUEST CANCELLED"; |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< releaser->call()->DebugTag() << "RESUME BATCH REQUEST CANCELLED"; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (--refcnt == 0) { |
|
|
|
@ -265,8 +265,8 @@ BaseCallData::Flusher::~Flusher() { |
|
|
|
|
BaseCallData* call = |
|
|
|
|
static_cast<BaseCallData*>(batch->handler_private.extra_arg); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< "FLUSHER:forward batch via closure: " < < < < |
|
|
|
|
grpc_transport_stream_op_batch_string(batch, false); |
|
|
|
|
<< "FLUSHER:forward batch via closure: " |
|
|
|
|
<< grpc_transport_stream_op_batch_string(batch, false); |
|
|
|
|
grpc_call_next_op(call->elem(), batch); |
|
|
|
|
GRPC_CALL_STACK_UNREF(call->call_stack(), "flusher_batch"); |
|
|
|
|
}; |
|
|
|
@ -276,8 +276,8 @@ BaseCallData::Flusher::~Flusher() { |
|
|
|
|
batch->is_traced = true; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< "FLUSHER:queue batch to forward in closure: " < < < < |
|
|
|
|
grpc_transport_stream_op_batch_string(release_[i], false); |
|
|
|
|
<< "FLUSHER:queue batch to forward in closure: " |
|
|
|
|
<< grpc_transport_stream_op_batch_string(release_[i], false); |
|
|
|
|
batch->handler_private.extra_arg = call_; |
|
|
|
|
GRPC_CLOSURE_INIT(&batch->handler_private.closure, call_next_op, batch, |
|
|
|
|
nullptr); |
|
|
|
@ -286,8 +286,9 @@ BaseCallData::Flusher::~Flusher() { |
|
|
|
|
"flusher_batch"); |
|
|
|
|
} |
|
|
|
|
call_closures_.RunClosuresWithoutYielding(call_->call_combiner()); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << "FLUSHER:forward batch: " < < < < |
|
|
|
|
grpc_transport_stream_op_batch_string(release_[0], false); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< "FLUSHER:forward batch: " |
|
|
|
|
<< grpc_transport_stream_op_batch_string(release_[0], false); |
|
|
|
|
if (call_->call() != nullptr && call_->call()->traced()) { |
|
|
|
|
release_[0]->is_traced = true; |
|
|
|
|
} |
|
|
|
@ -325,8 +326,8 @@ const char* BaseCallData::SendMessage::StateString(State state) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BaseCallData::SendMessage::StartOp(CapturedBatch batch) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" SendMessage.StartOp st=" << StateString(state_); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< base_->LogTag() << " SendMessage.StartOp st=" << StateString(state_); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
|
state_ = State::kGotBatchNoPipe; |
|
|
|
@ -351,8 +352,8 @@ void BaseCallData::SendMessage::StartOp(CapturedBatch batch) { |
|
|
|
|
|
|
|
|
|
template <typename T> |
|
|
|
|
void BaseCallData::SendMessage::GotPipe(T* pipe_end) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" SendMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< base_->LogTag() << " SendMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
CHECK_NE(pipe_end, nullptr); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
@ -605,8 +606,9 @@ const char* BaseCallData::ReceiveMessage::StateString(State state) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" ReceiveMessage.StartOp st=" << StateString(state_); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< base_->LogTag() |
|
|
|
|
<< " ReceiveMessage.StartOp st=" << StateString(state_); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
|
state_ = State::kForwardedBatchNoPipe; |
|
|
|
@ -644,8 +646,9 @@ void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { |
|
|
|
|
|
|
|
|
|
template <typename T> |
|
|
|
|
void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" ReceiveMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< base_->LogTag() |
|
|
|
|
<< " ReceiveMessage.GotPipe st=" << StateString(state_); |
|
|
|
|
switch (state_) { |
|
|
|
|
case State::kInitial: |
|
|
|
|
state_ = State::kIdle; |
|
|
|
@ -887,8 +890,9 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, |
|
|
|
|
case State::kPulledFromPipe: { |
|
|
|
|
CHECK(push_.has_value()); |
|
|
|
|
if ((*push_)().ready()) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << base_->LogTag() < < < < |
|
|
|
|
" ReceiveMessage.WakeInsideCombiner push complete"; |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< base_->LogTag() |
|
|
|
|
<< " ReceiveMessage.WakeInsideCombiner push complete"; |
|
|
|
|
if (state_ == State::kCompletedWhilePulledFromPipe) { |
|
|
|
|
interceptor()->Push()->Close(); |
|
|
|
|
state_ = State::kCancelled; |
|
|
|
@ -1001,8 +1005,8 @@ class ClientCallData::PollContext { |
|
|
|
|
void Run() { |
|
|
|
|
DCHECK(HasContext<Arena>()); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< self_->LogTag() << " ClientCallData.PollContext.Run " < < < < |
|
|
|
|
self_->DebugString(); |
|
|
|
|
<< self_->LogTag() << " ClientCallData.PollContext.Run " |
|
|
|
|
<< self_->DebugString(); |
|
|
|
|
CHECK(have_scoped_activity_); |
|
|
|
|
repoll_ = false; |
|
|
|
|
if (self_->send_message() != nullptr) { |
|
|
|
@ -1648,8 +1652,7 @@ void ClientCallData::HookRecvTrailingMetadata(CapturedBatch batch) { |
|
|
|
|
ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise( |
|
|
|
|
CallArgs call_args) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() << " ClientCallData.MakeNextPromise " < < < < |
|
|
|
|
DebugString(); |
|
|
|
|
<< LogTag() << " ClientCallData.MakeNextPromise " << DebugString(); |
|
|
|
|
CHECK_NE(poll_ctx_, nullptr); |
|
|
|
|
CHECK(send_initial_state_ == SendInitialState::kQueued); |
|
|
|
|
send_initial_metadata_batch_->payload->send_initial_metadata |
|
|
|
@ -1710,8 +1713,7 @@ ArenaPromise<ServerMetadataHandle> ClientCallData::MakeNextPromise( |
|
|
|
|
// application.
|
|
|
|
|
Poll<ServerMetadataHandle> ClientCallData::PollTrailingMetadata() { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() << " ClientCallData.PollTrailingMetadata " < < < < |
|
|
|
|
DebugString(); |
|
|
|
|
<< LogTag() << " ClientCallData.PollTrailingMetadata " << DebugString(); |
|
|
|
|
CHECK_NE(poll_ctx_, nullptr); |
|
|
|
|
if (send_initial_state_ == SendInitialState::kQueued) { |
|
|
|
|
// First poll: pass the send_initial_metadata op down the stack.
|
|
|
|
@ -2256,8 +2258,9 @@ ArenaPromise<ServerMetadataHandle> ServerCallData::MakeNextPromise( |
|
|
|
|
// All polls: await sending the trailing metadata, then foward it down the
|
|
|
|
|
// stack.
|
|
|
|
|
Poll<ServerMetadataHandle> ServerCallData::PollTrailingMetadata() { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) << LogTag() < < < < |
|
|
|
|
" PollTrailingMetadata: " << StateString(send_trailing_state_); |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() |
|
|
|
|
<< " PollTrailingMetadata: " << StateString(send_trailing_state_); |
|
|
|
|
switch (send_trailing_state_) { |
|
|
|
|
case SendTrailingState::kInitial: |
|
|
|
|
case SendTrailingState::kQueuedBehindSendMessage: |
|
|
|
@ -2286,8 +2289,8 @@ void ServerCallData::RecvTrailingMetadataReadyCallback( |
|
|
|
|
|
|
|
|
|
void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) { |
|
|
|
|
GRPC_TRACE_LOG(channel, INFO) |
|
|
|
|
<< LogTag() << ": RecvTrailingMetadataReady error=" << error < < < < |
|
|
|
|
" md=" << recv_trailing_metadata_->DebugString(); |
|
|
|
|
<< LogTag() << ": RecvTrailingMetadataReady error=" << error |
|
|
|
|
<< " md=" << recv_trailing_metadata_->DebugString(); |
|
|
|
|
Flusher flusher(this); |
|
|
|
|
PollContext poll_ctx(this, &flusher); |
|
|
|
|
Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(), |
|
|
|
|