From 066949ee56a84189006abeab2e947594787738f6 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Fri, 19 Oct 2018 09:32:06 -0700 Subject: [PATCH] Address current PR comments. --- .../filters/client_channel/client_channel.cc | 96 ++++++++----------- .../ext/filters/client_channel/lb_policy.h | 7 +- 2 files changed, 46 insertions(+), 57 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f57612b2413..9732b1753a8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -934,9 +934,9 @@ typedef struct client_channel_call_data { grpc_closure pick_cancel_closure; // A closure to fork notifying the lb interceptor and run the original trailer // interception callback. - grpc_closure lb_intercept_recv_trailing_metadata_ready; + grpc_closure recv_trailing_metadata_ready_for_lb; // The original trailer interception callback. - grpc_closure* before_lb_intercept_recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -999,6 +999,9 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); +static void maybe_intercept_trailing_metadata_for_lb( + void* arg, grpc_transport_stream_op_batch* batch); +static void recv_trailing_metadata_ready_for_lb(void* arg, grpc_error* error); // // send op data caching @@ -1273,51 +1276,6 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_subchannel_call_process_op(subchannel_call, batch); } -// The callback to intercept trailing metadata if retries is not enabled -static void recv_trailing_metadata_ready_for_lb(void* arg, grpc_error* error) { - subchannel_batch_data* batch_data = static_cast(arg); - grpc_call_element* elem = batch_data->elem; - call_data* calld = static_cast(elem->call_data); - GPR_ASSERT(calld->pick.recv_trailing_metadata_ready != nullptr); - GPR_ASSERT(calld->pick.recv_trailing_metadata != nullptr); - - GRPC_CLOSURE_SCHED( - calld->pick.recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); - calld->pick.recv_trailing_metadata = nullptr; - calld->pick.recv_trailing_metadata_ready = nullptr; - - GRPC_CLOSURE_RUN( - calld->before_lb_intercept_recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); -} - -// Installs a interceptor to inform the lb of the trailing metadata, if needed -static void maybe_intercept_trailing_metadata_for_lb( - void* arg, grpc_transport_stream_op_batch* batch) { - subchannel_batch_data* batch_data = static_cast(arg); - grpc_call_element* elem = batch_data->elem; - call_data* calld = static_cast(elem->call_data); - if (calld->pick.recv_trailing_metadata_ready != nullptr) { - GPR_ASSERT(calld->pick.recv_trailing_metadata != nullptr); - // Unlike the retries case, the location of the trailing metadata is known - // already, so just point to it now. - *calld->pick.recv_trailing_metadata = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; - - // There may be a pre-existing recv_trailing_metadata_ready callback - calld->before_lb_intercept_recv_trailing_metadata_ready = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - - GRPC_CLOSURE_INIT(&calld->lb_intercept_recv_trailing_metadata_ready, - recv_trailing_metadata_ready_for_lb, elem, - grpc_schedule_on_exec_ctx); - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->lb_intercept_recv_trailing_metadata_ready; - } -} - // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); @@ -2043,15 +2001,12 @@ static void recv_trailing_metadata_ready_for_retries( // Not retrying, so commit the call. retry_commit(elem, retry_state); // Now that the try is committed, give the trailer to the lb policy as needed - if (calld->pick.recv_trailing_metadata_ready != nullptr) { - GPR_ASSERT(calld->pick.recv_trailing_metadata != nullptr); + if (calld->pick.recv_trailing_metadata != nullptr) { *calld->pick.recv_trailing_metadata = md_batch; - GRPC_CLOSURE_SCHED( - calld->pick.recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); - calld->pick.recv_trailing_metadata = nullptr; - calld->pick.recv_trailing_metadata_ready = nullptr; } + GRPC_CLOSURE_SCHED( + calld->pick.recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); // Run any necessary closures. run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); } @@ -2638,6 +2593,39 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { // LB pick // +// The callback to intercept trailing metadata if retries is not enabled +static void recv_trailing_metadata_ready_for_lb(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast(arg); + grpc_call_element* elem = batch_data->elem; + call_data* calld = static_cast(elem->call_data); + if (calld->pick.recv_trailing_metadata != nullptr) { + *calld->pick.recv_trailing_metadata = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + } + GRPC_CLOSURE_SCHED( + calld->pick.recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN( + calld->original_recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); +} + +// Installs a interceptor to inform the lb of the trailing metadata, if needed +static void maybe_intercept_trailing_metadata_for_lb( + void* arg, grpc_transport_stream_op_batch* batch) { + subchannel_batch_data* batch_data = static_cast(arg); + grpc_call_element* elem = batch_data->elem; + call_data* calld = static_cast(elem->call_data); + calld->original_recv_trailing_metadata_ready = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_for_lb, + recv_trailing_metadata_ready_for_lb, elem, + grpc_schedule_on_exec_ctx); + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_for_lb; +} + static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 9ef0033aebd..6c04b4b54cf 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -74,10 +74,11 @@ class LoadBalancingPolicy /// If null, pick will fail if a result is not available synchronously. grpc_closure* on_complete; - // Callback set by lb policy if the trailing metadata should be intercepted. + // Callback set by lb policy to be notified of trailing metadata. grpc_closure* recv_trailing_metadata_ready; - // If \a recv_trailing_metadata_ready \a is set, the client_channel sets - // this pointer to the metadata batch and schedules the closure. + // If this is not nullptr, then the client channel will point it to the + // call's trailing metadata before invoking recv_trailing_metadata_ready. + // If this is nullptr, then the callback will still be called. grpc_metadata_batch** recv_trailing_metadata; /// Will be set to the selected subchannel, or nullptr on failure or when