diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 4691aa1c971..388736b60a3 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -936,7 +936,7 @@ typedef struct client_channel_call_data { // state needed to support channelz interception of recv trailing metadata. grpc_closure recv_trailing_metadata_ready_channelz; grpc_closure* original_recv_trailing_metadata; - grpc_metadata_batch* recv_trailing_metadata_batch; + grpc_metadata_batch* recv_trailing_metadata; grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -999,7 +999,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); -static void maybe_intercept_metadata_for_channelz( +static void maybe_intercept_recv_trailing_metadata_for_channelz( grpc_call_element* elem, grpc_transport_stream_op_batch* batch); // @@ -1299,7 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - maybe_intercept_metadata_for_channelz(elem, batch); + maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -2589,6 +2589,69 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { closures.RunClosures(calld->call_combiner); } +// +// Channelz +// + +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast(arg); + channel_data* chand = static_cast(elem->channel_data); + call_data* calld = static_cast(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = calld->recv_trailing_metadata; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata == nullptr); + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + // // LB pick // @@ -2669,65 +2732,6 @@ static void pick_done(void* arg, grpc_error* error) { } } -static void recv_trailing_metadata_ready_channelz(void* arg, - grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " - "error=%s", - chand, calld, grpc_error_string(error)); - } - GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); - grpc_status_code status = GRPC_STATUS_OK; - grpc_metadata_batch* md_batch = calld->recv_trailing_metadata_batch; - get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); - grpc_core::channelz::SubchannelNode* channelz_subchannel = - calld->pick.connected_subchannel->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - if (status == GRPC_STATUS_OK) { - channelz_subchannel->RecordCallSucceeded(); - } else { - channelz_subchannel->RecordCallFailed(); - } - calld->recv_trailing_metadata_batch = nullptr; - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); -} - -// If channelz is enabled, intercept recv_trailing so that we may check the -// status and associate it to a subchannel. -// Returns true if callback was intercepted, false otherwise. -static void maybe_intercept_metadata_for_channelz( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - call_data* calld = static_cast(elem->call_data); - // only intercept payloads with recv trailing. - if (!batch->recv_trailing_metadata) { - return; - } - // only add interceptor is channelz is enabled. - if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { - return; - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "calld=%p batch=%p: intercepting recv trailing for channelz", calld, - batch); - } - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, - recv_trailing_metadata_ready_channelz, elem, - grpc_schedule_on_exec_ctx); - // save some state needed for the interception callback. - GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); - calld->recv_trailing_metadata_batch = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->original_recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready_channelz; -} - static void maybe_add_call_to_channel_interested_parties_locked( grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 4a4f0e49d00..e2ea334dedf 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -126,13 +126,11 @@ static void con_start_transport_stream_op_batch( // closure for each one. callback_state* state = static_cast(gpr_malloc(sizeof(*state))); - intercept_callback(calld, state, true, - "connected_on_complete (cancel_stream)", + intercept_callback(calld, state, true, "on_complete (cancel_stream)", &batch->on_complete); } else if (batch->on_complete != nullptr) { callback_state* state = get_state_for_batch(calld, batch); - intercept_callback(calld, state, false, "connected_on_complete", - &batch->on_complete); + intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } grpc_transport_perform_stream_op( chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch);