Fix bug with proxy end2end test

reviewable/pr16055/r13
ncteisen 6 years ago
parent ccf04c4511
commit e0ae6c73ec
  1. 58
      src/core/ext/filters/client_channel/client_channel.cc

@ -934,8 +934,10 @@ typedef struct client_channel_call_data {
grpc_closure pick_closure; grpc_closure pick_closure;
grpc_closure pick_cancel_closure; grpc_closure pick_cancel_closure;
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_channelz; grpc_closure recv_trailing_metadata_ready_channelz;
grpc_closure* original_recv_trailing_metadata; grpc_closure* original_recv_trailing_metadata;
grpc_transport_stream_op_batch* recv_trailing_metadata_batch;
grpc_polling_entity* pollent; grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties; bool pollent_added_to_interested_parties;
@ -1291,17 +1293,11 @@ static void recv_trailing_metadata_ready_channelz(void* arg,
"error=%s", "error=%s",
chand, calld, grpc_error_string(error)); chand, calld, grpc_error_string(error));
} }
// find the right pending batch. GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
pending_batch* pending = pending_batch_find(
elem, "invoking recv_trailing_metadata_channelz for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_trailing_metadata &&
batch->payload->recv_trailing_metadata
.recv_trailing_metadata_ready != nullptr;
});
grpc_status_code status = GRPC_STATUS_OK; grpc_status_code status = GRPC_STATUS_OK;
grpc_metadata_batch* md_batch = grpc_metadata_batch* md_batch =
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata; calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata
.recv_trailing_metadata;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
grpc_core::channelz::SubchannelNode* channelz_subchannel = grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel(); calld->pick.connected_subchannel->channelz_subchannel();
@ -1311,29 +1307,39 @@ static void recv_trailing_metadata_ready_channelz(void* arg,
} else { } else {
channelz_subchannel->RecordCallFailed(); channelz_subchannel->RecordCallFailed();
} }
pending->batch = nullptr; calld->recv_trailing_metadata_batch = nullptr;
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
} }
// If channelz is enabled, intercept recv_trailing so that we may check the // If channelz is enabled, intercept recv_trailing so that we may check the
// status and associate it to a subchannel. // status and associate it to a subchannel.
// Returns true if callback was intercepted, false otherwise. // Returns true if callback was intercepted, false otherwise.
static bool maybe_intercept_recv_trailing_for_channelz( static void maybe_intercept_recv_trailing_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data); call_data* calld = static_cast<call_data*>(elem->call_data);
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
// only add interceptor is channelz is enabled. // only add interceptor is channelz is enabled.
if (calld->pick.connected_subchannel->channelz_subchannel() != nullptr) { if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, return;
recv_trailing_metadata_ready_channelz, elem, }
grpc_schedule_on_exec_ctx); if (grpc_client_channel_trace.enabled()) {
calld->original_recv_trailing_metadata = gpr_log(GPR_INFO,
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = batch);
&calld->recv_trailing_metadata_ready_channelz;
return true;
} else {
return false;
} }
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
recv_trailing_metadata_ready_channelz, elem,
grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
calld->recv_trailing_metadata_batch = batch;
calld->original_recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_channelz;
} }
// This is called via the call combiner, so access to calld is synchronized. // This is called via the call combiner, so access to calld is synchronized.
@ -1360,18 +1366,14 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i]; pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch; grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) { if (batch != nullptr) {
bool intercepted = maybe_intercept_recv_trailing_for_channelz(elem, batch);
maybe_intercept_recv_trailing_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call; batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch, resume_pending_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
"pending_batches_resume"); "pending_batches_resume");
// Only clear if we haven't intercepted anything. pending_batch_clear(calld, pending);
if (!intercepted) {
pending_batch_clear(calld, pending);
}
} }
} }
// Note: This will release the call combiner. // Note: This will release the call combiner.

Loading…
Cancel
Save