Address current PR comments.

pull/16898/head
Spencer Fang 6 years ago
parent 7cc2660ae5
commit 066949ee56
  1. 96
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/ext/filters/client_channel/lb_policy.h

@ -934,9 +934,9 @@ typedef struct client_channel_call_data {
grpc_closure pick_cancel_closure; grpc_closure pick_cancel_closure;
// A closure to fork notifying the lb interceptor and run the original trailer // A closure to fork notifying the lb interceptor and run the original trailer
// interception callback. // interception callback.
grpc_closure lb_intercept_recv_trailing_metadata_ready; grpc_closure recv_trailing_metadata_ready_for_lb;
// The original trailer interception callback. // The original trailer interception callback.
grpc_closure* before_lb_intercept_recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata_ready;
grpc_polling_entity* pollent; grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties; bool pollent_added_to_interested_parties;
@ -999,6 +999,9 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error); static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored);
static void maybe_intercept_trailing_metadata_for_lb(
void* arg, grpc_transport_stream_op_batch* batch);
static void recv_trailing_metadata_ready_for_lb(void* arg, grpc_error* error);
// //
// send op data caching // send op data caching
@ -1273,51 +1276,6 @@ static void resume_pending_batch_in_call_combiner(void* arg,
grpc_subchannel_call_process_op(subchannel_call, batch); grpc_subchannel_call_process_op(subchannel_call, batch);
} }
// The callback to intercept trailing metadata if retries is not enabled
static void recv_trailing_metadata_ready_for_lb(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
GPR_ASSERT(calld->pick.recv_trailing_metadata_ready != nullptr);
GPR_ASSERT(calld->pick.recv_trailing_metadata != nullptr);
GRPC_CLOSURE_SCHED(
calld->pick.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
calld->pick.recv_trailing_metadata = nullptr;
calld->pick.recv_trailing_metadata_ready = nullptr;
GRPC_CLOSURE_RUN(
calld->before_lb_intercept_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
// Installs a interceptor to inform the lb of the trailing metadata, if needed
static void maybe_intercept_trailing_metadata_for_lb(
void* arg, grpc_transport_stream_op_batch* batch) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->pick.recv_trailing_metadata_ready != nullptr) {
GPR_ASSERT(calld->pick.recv_trailing_metadata != nullptr);
// Unlike the retries case, the location of the trailing metadata is known
// already, so just point to it now.
*calld->pick.recv_trailing_metadata =
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata;
// There may be a pre-existing recv_trailing_metadata_ready callback
calld->before_lb_intercept_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&calld->lb_intercept_recv_trailing_metadata_ready,
recv_trailing_metadata_ready_for_lb, elem,
grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->lb_intercept_recv_trailing_metadata_ready;
}
}
// This is called via the call combiner, so access to calld is synchronized. // This is called via the call combiner, so access to calld is synchronized.
static void pending_batches_resume(grpc_call_element* elem) { static void pending_batches_resume(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@ -2043,15 +2001,12 @@ static void recv_trailing_metadata_ready_for_retries(
// Not retrying, so commit the call. // Not retrying, so commit the call.
retry_commit(elem, retry_state); retry_commit(elem, retry_state);
// Now that the try is committed, give the trailer to the lb policy as needed // Now that the try is committed, give the trailer to the lb policy as needed
if (calld->pick.recv_trailing_metadata_ready != nullptr) { if (calld->pick.recv_trailing_metadata != nullptr) {
GPR_ASSERT(calld->pick.recv_trailing_metadata != nullptr);
*calld->pick.recv_trailing_metadata = md_batch; *calld->pick.recv_trailing_metadata = md_batch;
GRPC_CLOSURE_SCHED(
calld->pick.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
calld->pick.recv_trailing_metadata = nullptr;
calld->pick.recv_trailing_metadata_ready = nullptr;
} }
GRPC_CLOSURE_SCHED(
calld->pick.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
// Run any necessary closures. // Run any necessary closures.
run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
} }
@ -2638,6 +2593,39 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
// LB pick // LB pick
// //
// The callback to intercept trailing metadata if retries is not enabled
static void recv_trailing_metadata_ready_for_lb(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->pick.recv_trailing_metadata != nullptr) {
*calld->pick.recv_trailing_metadata =
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata;
}
GRPC_CLOSURE_SCHED(
calld->pick.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
GRPC_CLOSURE_RUN(
calld->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
// Installs a interceptor to inform the lb of the trailing metadata, if needed
static void maybe_intercept_trailing_metadata_for_lb(
void* arg, grpc_transport_stream_op_batch* batch) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->original_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_for_lb,
recv_trailing_metadata_ready_for_lb, elem,
grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_for_lb;
}
static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data); call_data* calld = static_cast<call_data*>(elem->call_data);

@ -74,10 +74,11 @@ class LoadBalancingPolicy
/// If null, pick will fail if a result is not available synchronously. /// If null, pick will fail if a result is not available synchronously.
grpc_closure* on_complete; grpc_closure* on_complete;
// Callback set by lb policy if the trailing metadata should be intercepted. // Callback set by lb policy to be notified of trailing metadata.
grpc_closure* recv_trailing_metadata_ready; grpc_closure* recv_trailing_metadata_ready;
// If \a recv_trailing_metadata_ready \a is set, the client_channel sets // If this is not nullptr, then the client channel will point it to the
// this pointer to the metadata batch and schedules the closure. // call's trailing metadata before invoking recv_trailing_metadata_ready.
// If this is nullptr, then the callback will still be called.
grpc_metadata_batch** recv_trailing_metadata; grpc_metadata_batch** recv_trailing_metadata;
/// Will be set to the selected subchannel, or nullptr on failure or when /// Will be set to the selected subchannel, or nullptr on failure or when

Loading…
Cancel
Save