reviewer feedback

reviewable/pr16055/r16
ncteisen 7 years ago
parent 4b5b019d56
commit be8844bcdb
  1. 133
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/lib/channel/channelz.h
  3. 6
      src/core/lib/channel/connected_channel.cc
  4. 8
      src/core/lib/surface/call.cc
  5. 4
      test/core/channel/channel_trace_test.cc
  6. 4
      test/core/channel/channelz_test.cc

@ -937,7 +937,7 @@ typedef struct client_channel_call_data {
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_channelz;
grpc_closure* original_recv_trailing_metadata;
grpc_transport_stream_op_batch* recv_trailing_metadata_batch;
grpc_metadata_batch* recv_trailing_metadata_batch;
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@ -1000,14 +1000,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
template <typename Predicate>
static pending_batch* pending_batch_find(grpc_call_element* elem,
const char* log_message,
Predicate predicate);
static void get_call_status(grpc_call_element* elem,
grpc_metadata_batch* md_batch, grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md);
static void maybe_intercept_metadata_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
//
// send op data caching
@ -1282,66 +1276,6 @@ static void resume_pending_batch_in_call_combiner(void* arg,
grpc_subchannel_call_process_op(subchannel_call, batch);
}
static void recv_trailing_metadata_ready_channelz(void* arg,
grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
"error=%s",
chand, calld, grpc_error_string(error));
}
GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
grpc_status_code status = GRPC_STATUS_OK;
grpc_metadata_batch* md_batch =
calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata
.recv_trailing_metadata;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
}
calld->recv_trailing_metadata_batch = nullptr;
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
}
// If channelz is enabled, intercept recv_trailing so that we may check the
// status and associate it to a subchannel.
// Returns true if callback was intercepted, false otherwise.
static void maybe_intercept_recv_trailing_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
// only add interceptor is channelz is enabled.
if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
return;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"calld=%p batch=%p: intercepting recv trailing for channelz", calld,
batch);
}
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
recv_trailing_metadata_ready_channelz, elem,
grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
calld->recv_trailing_metadata_batch = batch;
calld->original_recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_channelz;
}
// This is called via the call combiner, so access to calld is synchronized.
static void pending_batches_resume(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@ -1366,7 +1300,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
maybe_intercept_recv_trailing_for_channelz(elem, batch);
maybe_intercept_metadata_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@ -2736,6 +2670,65 @@ static void pick_done(void* arg, grpc_error* error) {
}
}
static void recv_trailing_metadata_ready_channelz(void* arg,
grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
"error=%s",
chand, calld, grpc_error_string(error));
}
GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
grpc_status_code status = GRPC_STATUS_OK;
grpc_metadata_batch* md_batch = calld->recv_trailing_metadata_batch;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
}
calld->recv_trailing_metadata_batch = nullptr;
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
}
// If channelz is enabled, intercept recv_trailing so that we may check the
// status and associate it to a subchannel.
// Returns true if callback was intercepted, false otherwise.
static void maybe_intercept_metadata_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
// only add interceptor is channelz is enabled.
if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
return;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"calld=%p batch=%p: intercepting recv trailing for channelz", calld,
batch);
}
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
recv_trailing_metadata_ready_channelz, elem,
grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
calld->recv_trailing_metadata_batch =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_channelz;
}
static void maybe_add_call_to_channel_interested_parties_locked(
grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);

@ -79,7 +79,7 @@ class BaseNode : public RefCounted<BaseNode> {
const intptr_t uuid_;
};
// This class is a helper class for channelz entities that deal with Channels
// This class is a helper class for channelz entities that deal with Channels,
// Subchannels, and Servers, since those have similar proto definitions.
// This class has the ability to:
// - track calls_{started,succeeded,failed}
@ -133,6 +133,9 @@ class ChannelNode : public BaseNode {
// so it leaves these implementations blank.
//
// This is utilizing the template method design pattern.
//
// TODO(ncteisen): remove these template methods in favor of manual traversal
// and mutation of the grpc_json object.
virtual void PopulateConnectivityState(grpc_json* json) {}
virtual void PopulateChildRefs(grpc_json* json) {}
@ -158,7 +161,7 @@ class ChannelNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
// to allow the channel trace test to access trace();
// to allow the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;

@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch(
if (batch->recv_initial_metadata) {
callback_state* state = &calld->recv_initial_metadata_ready;
intercept_callback(
calld, state, false, "connected_recv_initial_metadata_ready",
calld, state, false, "recv_initial_metadata_ready",
&batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
}
if (batch->recv_message) {
callback_state* state = &calld->recv_message_ready;
intercept_callback(calld, state, false, "connected_recv_message_ready",
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
if (batch->recv_trailing_metadata) {
callback_state* state = &calld->recv_trailing_metadata_ready;
intercept_callback(
calld, state, false, "connected_recv_trailing_metadata_ready",
calld, state, false, "recv_trailing_metadata_ready",
&batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
}
if (batch->cancel_stream) {

@ -1425,7 +1425,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp,
grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready");
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
receiving_stream_ready(bctlp, error);
}
@ -1510,8 +1510,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner,
"call_recv_initial_metadata_ready");
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
@ -1562,8 +1561,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner,
"call_recv_trailing_metadata_ready");
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];

@ -44,8 +44,8 @@ namespace testing {
// testing peer to access channel internals
class ChannelNodePeer {
public:
ChannelNodePeer(ChannelNode* node) : node_(node) {}
ChannelTrace* trace() { return &node_->trace_; }
explicit ChannelNodePeer(ChannelNode* node) : node_(node) {}
ChannelTrace* trace() const { return &node_->trace_; }
private:
ChannelNode* node_;

@ -46,8 +46,8 @@ namespace testing {
// testing peer to access channel internals
class CallCountingHelperPeer {
public:
CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {}
grpc_millis last_call_started_millis() {
explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {}
grpc_millis last_call_started_millis() const {
return (grpc_millis)gpr_atm_no_barrier_load(
&node_->last_call_started_millis_);
}

Loading…
Cancel
Save