From be8844bcdb704cff6a70507f5093e4bb26320ea3 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 7 Sep 2018 15:44:59 -0700 Subject: [PATCH] reviewer feedback --- .../filters/client_channel/client_channel.cc | 133 +++++++++--------- src/core/lib/channel/channelz.h | 7 +- src/core/lib/channel/connected_channel.cc | 6 +- src/core/lib/surface/call.cc | 8 +- test/core/channel/channel_trace_test.cc | 4 +- test/core/channel/channelz_test.cc | 4 +- 6 files changed, 78 insertions(+), 84 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d015ceb3359..000cf82c6cb 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -937,7 +937,7 @@ typedef struct client_channel_call_data { // state needed to support channelz interception of recv trailing metadata. grpc_closure recv_trailing_metadata_ready_channelz; grpc_closure* original_recv_trailing_metadata; - grpc_transport_stream_op_batch* recv_trailing_metadata_batch; + grpc_metadata_batch* recv_trailing_metadata_batch; grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -1000,14 +1000,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); -template -static pending_batch* pending_batch_find(grpc_call_element* elem, - const char* log_message, - Predicate predicate); -static void get_call_status(grpc_call_element* elem, - grpc_metadata_batch* md_batch, grpc_error* error, - grpc_status_code* status, - grpc_mdelem** server_pushback_md); +static void maybe_intercept_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); // // send op data caching @@ -1282,66 +1276,6 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_subchannel_call_process_op(subchannel_call, batch); } -static void recv_trailing_metadata_ready_channelz(void* arg, - grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " - "error=%s", - chand, calld, grpc_error_string(error)); - } - GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); - grpc_status_code status = GRPC_STATUS_OK; - grpc_metadata_batch* md_batch = - calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata - .recv_trailing_metadata; - get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); - grpc_core::channelz::SubchannelNode* channelz_subchannel = - calld->pick.connected_subchannel->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - if (status == GRPC_STATUS_OK) { - channelz_subchannel->RecordCallSucceeded(); - } else { - channelz_subchannel->RecordCallFailed(); - } - calld->recv_trailing_metadata_batch = nullptr; - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); -} - -// If channelz is enabled, intercept recv_trailing so that we may check the -// status and associate it to a subchannel. -// Returns true if callback was intercepted, false otherwise. -static void maybe_intercept_recv_trailing_for_channelz( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - call_data* calld = static_cast(elem->call_data); - // only intercept payloads with recv trailing. - if (!batch->recv_trailing_metadata) { - return; - } - // only add interceptor is channelz is enabled. - if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { - return; - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "calld=%p batch=%p: intercepting recv trailing for channelz", calld, - batch); - } - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, - recv_trailing_metadata_ready_channelz, elem, - grpc_schedule_on_exec_ctx); - // save some state needed for the interception callback. - GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); - calld->recv_trailing_metadata_batch = batch; - calld->original_recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready_channelz; -} - // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); @@ -1366,7 +1300,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - maybe_intercept_recv_trailing_for_channelz(elem, batch); + maybe_intercept_metadata_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -2736,6 +2670,65 @@ static void pick_done(void* arg, grpc_error* error) { } } +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast(arg); + channel_data* chand = static_cast(elem->channel_data); + call_data* calld = static_cast(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = calld->recv_trailing_metadata_batch; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata_batch = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); + calld->recv_trailing_metadata_batch = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + static void maybe_add_call_to_channel_interested_parties_locked( grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index bd2735929c8..db5d05140db 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -79,7 +79,7 @@ class BaseNode : public RefCounted { const intptr_t uuid_; }; -// This class is a helper class for channelz entities that deal with Channels +// This class is a helper class for channelz entities that deal with Channels, // Subchannels, and Servers, since those have similar proto definitions. // This class has the ability to: // - track calls_{started,succeeded,failed} @@ -133,6 +133,9 @@ class ChannelNode : public BaseNode { // so it leaves these implementations blank. // // This is utilizing the template method design pattern. + // + // TODO(ncteisen): remove these template methods in favor of manual traversal + // and mutation of the grpc_json object. virtual void PopulateConnectivityState(grpc_json* json) {} virtual void PopulateChildRefs(grpc_json* json) {} @@ -158,7 +161,7 @@ class ChannelNode : public BaseNode { void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - // to allow the channel trace test to access trace(); + // to allow the channel trace test to access trace_. friend class testing::ChannelNodePeer; grpc_channel* channel_ = nullptr; UniquePtr target_; diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 90a02546633..4a4f0e49d00 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch( if (batch->recv_initial_metadata) { callback_state* state = &calld->recv_initial_metadata_ready; intercept_callback( - calld, state, false, "connected_recv_initial_metadata_ready", + calld, state, false, "recv_initial_metadata_ready", &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); } if (batch->recv_message) { callback_state* state = &calld->recv_message_ready; - intercept_callback(calld, state, false, "connected_recv_message_ready", + intercept_callback(calld, state, false, "recv_message_ready", &batch->payload->recv_message.recv_message_ready); } if (batch->recv_trailing_metadata) { callback_state* state = &calld->recv_trailing_metadata_ready; intercept_callback( - calld, state, false, "connected_recv_trailing_metadata_ready", + calld, state, false, "recv_trailing_metadata_ready", &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); } if (batch->cancel_stream) { diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 3d69db4f83c..eb7e67233b7 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1425,7 +1425,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready"); receiving_stream_ready(bctlp, error); } @@ -1510,8 +1510,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, - "call_recv_initial_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { @@ -1562,8 +1561,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, - "call_recv_trailing_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; diff --git a/test/core/channel/channel_trace_test.cc b/test/core/channel/channel_trace_test.cc index e33277753b5..d594445bfb9 100644 --- a/test/core/channel/channel_trace_test.cc +++ b/test/core/channel/channel_trace_test.cc @@ -44,8 +44,8 @@ namespace testing { // testing peer to access channel internals class ChannelNodePeer { public: - ChannelNodePeer(ChannelNode* node) : node_(node) {} - ChannelTrace* trace() { return &node_->trace_; } + explicit ChannelNodePeer(ChannelNode* node) : node_(node) {} + ChannelTrace* trace() const { return &node_->trace_; } private: ChannelNode* node_; diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index 8fa46a18dae..09189fa36df 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -46,8 +46,8 @@ namespace testing { // testing peer to access channel internals class CallCountingHelperPeer { public: - CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} - grpc_millis last_call_started_millis() { + explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} + grpc_millis last_call_started_millis() const { return (grpc_millis)gpr_atm_no_barrier_load( &node_->last_call_started_millis_); }