diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8b4f1b604ce..d3a4c498214 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -934,8 +934,10 @@ typedef struct client_channel_call_data { grpc_closure pick_closure; grpc_closure pick_cancel_closure; + // state needed to support channelz interception of recv trailing metadata. grpc_closure recv_trailing_metadata_ready_channelz; grpc_closure* original_recv_trailing_metadata; + grpc_transport_stream_op_batch* recv_trailing_metadata_batch; grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -1291,17 +1293,11 @@ static void recv_trailing_metadata_ready_channelz(void* arg, "error=%s", chand, calld, grpc_error_string(error)); } - // find the right pending batch. - pending_batch* pending = pending_batch_find( - elem, "invoking recv_trailing_metadata_channelz for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_trailing_metadata && - batch->payload->recv_trailing_metadata - .recv_trailing_metadata_ready != nullptr; - }); + GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); grpc_status_code status = GRPC_STATUS_OK; grpc_metadata_batch* md_batch = - pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata + .recv_trailing_metadata; get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); grpc_core::channelz::SubchannelNode* channelz_subchannel = calld->pick.connected_subchannel->channelz_subchannel(); @@ -1311,29 +1307,39 @@ static void recv_trailing_metadata_ready_channelz(void* arg, } else { channelz_subchannel->RecordCallFailed(); } - pending->batch = nullptr; + calld->recv_trailing_metadata_batch = nullptr; GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); } // If channelz is enabled, intercept recv_trailing so that we may check the // status and associate it to a subchannel. // Returns true if callback was intercepted, false otherwise. -static bool maybe_intercept_recv_trailing_for_channelz( +static void maybe_intercept_recv_trailing_for_channelz( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = static_cast(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } // only add interceptor is channelz is enabled. - if (calld->pick.connected_subchannel->channelz_subchannel() != nullptr) { - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, - recv_trailing_metadata_ready_channelz, elem, - grpc_schedule_on_exec_ctx); - calld->original_recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready_channelz; - return true; - } else { - return false; + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); + calld->recv_trailing_metadata_batch = batch; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; } // This is called via the call combiner, so access to calld is synchronized. @@ -1360,18 +1366,14 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - bool intercepted = - maybe_intercept_recv_trailing_for_channelz(elem, batch); + maybe_intercept_recv_trailing_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "pending_batches_resume"); - // Only clear if we haven't intercepted anything. - if (!intercepted) { - pending_batch_clear(calld, pending); - } + pending_batch_clear(calld, pending); } } // Note: This will release the call combiner.