diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 3f3334d44a3..e99022a91b8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -553,7 +553,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); - grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } chand->lb_policy = new_lb_policy; @@ -659,7 +658,6 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); - grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); chand->lb_policy = nullptr; } @@ -794,7 +792,6 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); - grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } gpr_free(chand->info_lb_policy_name); @@ -855,10 +852,12 @@ typedef struct client_channel_call_data { grpc_subchannel_call* subchannel_call; grpc_error* error; - grpc_lb_policy_pick_state pick; + grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending. grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; + grpc_connected_subchannel* connected_subchannel; + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity* pollent; grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; @@ -867,6 +866,8 @@ typedef struct client_channel_call_data { grpc_transport_stream_op_batch* initial_metadata_batch; + grpc_linked_mdelem lb_token_mdelem; + grpc_closure on_complete; grpc_closure* original_on_complete; } call_data; @@ -1004,16 +1005,16 @@ static void create_subchannel_call_locked(grpc_call_element* elem, channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; const grpc_connected_subchannel_call_args call_args = { - calld->pollent, // pollent - calld->path, // path - calld->call_start_time, // start_time - calld->deadline, // deadline - calld->arena, // arena - calld->pick.subchannel_call_context, // context - calld->call_combiner // call_combiner + calld->pollent, // pollent + calld->path, // path + calld->call_start_time, // start_time + calld->deadline, // deadline + calld->arena, // arena + calld->subchannel_call_context, // context + calld->call_combiner // call_combiner }; grpc_error* new_error = grpc_connected_subchannel_create_call( - calld->pick.connected_subchannel, &call_args, &calld->subchannel_call); + calld->connected_subchannel, &call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -1031,7 +1032,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - if (calld->pick.connected_subchannel == nullptr) { + if (calld->connected_subchannel == nullptr) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); calld->error = error == GRPC_ERROR_NONE @@ -1070,16 +1071,13 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - // Note: chand->lb_policy may have changed since we started our pick, - // in which case we will be cancelling the pick on a policy other than - // the one we started it on. However, this will just be a no-op. - if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) { + if (calld->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", - chand, calld, chand->lb_policy); + chand, calld, calld->lb_policy); } - grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick, - GRPC_ERROR_REF(error)); + grpc_lb_policy_cancel_pick_locked( + calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); } @@ -1094,6 +1092,9 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", chand, calld); } + GPR_ASSERT(calld->lb_policy != nullptr); + GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); + calld->lb_policy = nullptr; async_pick_done_locked(elem, GRPC_ERROR_REF(error)); } @@ -1127,21 +1128,26 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; } } - calld->pick.initial_metadata = + const grpc_lb_policy_pick_args inputs = { calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata; - calld->pick.initial_metadata_flags = initial_metadata_flags; + .send_initial_metadata, + initial_metadata_flags, &calld->lb_token_mdelem}; + // Keep a ref to the LB policy in calld while the pick is pending. + GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); + calld->lb_policy = chand->lb_policy; GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->lb_pick_closure; - const bool pick_done = - grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick); + const bool pick_done = grpc_lb_policy_pick_locked( + chand->lb_policy, &inputs, &calld->connected_subchannel, + calld->subchannel_call_context, nullptr, &calld->lb_pick_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", chand, calld); } + GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); + calld->lb_policy = nullptr; } else { GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( @@ -1283,7 +1289,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - GPR_ASSERT(calld->pick.connected_subchannel == nullptr); + GPR_ASSERT(calld->connected_subchannel == nullptr); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. if (pick_callback_start_locked(elem)) { @@ -1461,14 +1467,15 @@ static void cc_destroy_call_elem(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_destroy_call"); } + GPR_ASSERT(calld->lb_policy == nullptr); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); - if (calld->pick.connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked"); + if (calld->connected_subchannel != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked"); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (calld->pick.subchannel_call_context[i].value != nullptr) { - calld->pick.subchannel_call_context[i].destroy( - calld->pick.subchannel_call_context[i].value); + if (calld->subchannel_call_context[i].value != nullptr) { + calld->subchannel_call_context[i].destroy( + calld->subchannel_call_context[i].value); } } GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index cc4fe7ec627..7a5a8dec34b 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -19,6 +19,8 @@ #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/lib/iomgr/combiner.h" +#define WEAK_REF_BITS 16 + grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( false, "lb_policy_refcount"); @@ -26,60 +28,91 @@ void grpc_lb_policy_init(grpc_lb_policy* policy, const grpc_lb_policy_vtable* vtable, grpc_combiner* combiner) { policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); + gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); policy->interested_parties = grpc_pollset_set_create(); policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy"); } #ifndef NDEBUG -void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line, - const char* reason) { - if (grpc_trace_lb_policy_refcount.enabled()) { - gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy, - old_refs, old_refs + 1, reason); - } +#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason +#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char* purpose +#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason +#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose #else -void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) { +#define REF_FUNC_EXTRA_ARGS +#define REF_MUTATE_EXTRA_ARGS +#define REF_FUNC_PASS_ARGS(new_reason) +#define REF_MUTATE_PASS_ARGS(x) #endif - gpr_ref(&lb_policy->refs); -} +static gpr_atm ref_mutate(grpc_lb_policy* c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifndef NDEBUG -void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line, - const char* reason) { if (grpc_trace_lb_policy_refcount.enabled()) { - gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy, - old_refs, old_refs - 1, reason); + "LB_POLICY: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, + purpose, old_val, old_val + delta, reason); } -#else -void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) { #endif - if (gpr_unref(&lb_policy->refs)) { - grpc_pollset_set_destroy(lb_policy->interested_parties); - grpc_combiner* combiner = lb_policy->combiner; - lb_policy->vtable->destroy(lb_policy); - GRPC_COMBINER_UNREF(combiner, "lb_policy"); + return old_val; +} + +void grpc_lb_policy_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); +} + +static void shutdown_locked(void* arg, grpc_error* error) { + grpc_lb_policy* policy = (grpc_lb_policy*)arg; + policy->vtable->shutdown_locked(policy); + GRPC_LB_POLICY_WEAK_UNREF(policy, "strong-unref"); +} + +void grpc_lb_policy_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS), + 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); + gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); + gpr_atm check = 1 << WEAK_REF_BITS; + if ((old_val & mask) == check) { + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(shutdown_locked, policy, + grpc_combiner_scheduler(policy->combiner)), + GRPC_ERROR_NONE); + } else { + grpc_lb_policy_weak_unref(policy REF_FUNC_PASS_ARGS("strong-unref")); } } -void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, - grpc_lb_policy* new_policy) { - policy->vtable->shutdown_locked(policy, new_policy); +void grpc_lb_policy_weak_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); +} + +void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); + if (old_val == 1) { + grpc_pollset_set_destroy(policy->interested_parties); + grpc_combiner* combiner = policy->combiner; + policy->vtable->destroy(policy); + GRPC_COMBINER_UNREF(combiner, "lb_policy"); + } } int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick) { - return policy->vtable->pick_locked(policy, pick); + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, + void** user_data, grpc_closure* on_complete) { + return policy->vtable->pick_locked(policy, pick_args, target, context, + user_data, on_complete); } void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error) { - policy->vtable->cancel_pick_locked(policy, pick, error); + policy->vtable->cancel_pick_locked(policy, target, error); } void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 1176a05b780..3572c97ed14 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -33,7 +33,7 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; struct grpc_lb_policy { const grpc_lb_policy_vtable* vtable; - gpr_refcount refs; + gpr_atm ref_pair; /* owned pointer to interested parties in load balancing decisions */ grpc_pollset_set* interested_parties; /* combiner under which lb_policy actions take place */ @@ -42,42 +42,32 @@ struct grpc_lb_policy { grpc_closure* request_reresolution; }; -/// State used for an LB pick. -typedef struct grpc_lb_policy_pick_state { - /// Initial metadata associated with the picking call. +/** Extra arguments for an LB pick */ +typedef struct grpc_lb_policy_pick_args { + /** Initial metadata associated with the picking call. */ grpc_metadata_batch* initial_metadata; - /// Bitmask used for selective cancelling. See \a - /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in - /// grpc_types.h. + /** Bitmask used for selective cancelling. See \a + * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in + * grpc_types.h */ uint32_t initial_metadata_flags; - /// Storage for LB token in \a initial_metadata, or NULL if not used. - grpc_linked_mdelem lb_token_mdelem_storage; - /// Closure to run when pick is complete, if not completed synchronously. - grpc_closure* on_complete; - /// Will be set to the selected subchannel, or NULL on failure or when - /// the LB policy decides to drop the call. - grpc_connected_subchannel* connected_subchannel; - /// Will be populated with context to pass to the subchannel call, if needed. - grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; - /// Upon success, \a *user_data will be set to whatever opaque information - /// may need to be propagated from the LB policy, or NULL if not needed. - void** user_data; - /// Next pointer. For internal use by LB policy. - struct grpc_lb_policy_pick_state* next; -} grpc_lb_policy_pick_state; + /** Storage for LB token in \a initial_metadata, or NULL if not used */ + grpc_linked_mdelem* lb_token_mdelem_storage; +} grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { void (*destroy)(grpc_lb_policy* policy); - - /// \see grpc_lb_policy_shutdown_locked(). - void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy); + void (*shutdown_locked)(grpc_lb_policy* policy); /** \see grpc_lb_policy_pick */ - int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick); + int (*pick_locked)(grpc_lb_policy* policy, + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, void** user_data, + grpc_closure* on_complete); /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick_locked)(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error); /** \see grpc_lb_policy_cancel_picks */ @@ -113,19 +103,37 @@ struct grpc_lb_policy_vtable { }; #ifndef NDEBUG + +/* Strong references: the policy will shutdown when they reach zero */ #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(p, r) \ grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) + +/* Weak references: they don't prevent the shutdown of the LB policy. When no + * strong references are left but there are still weak ones, shutdown is called. + * Once the weak reference also reaches zero, the LB policy is destroyed. */ +#define GRPC_LB_POLICY_WEAK_REF(p, r) \ + grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_UNREF(p, r) \ + grpc_lb_policy_weak_unref((p), __FILE__, __LINE__, (r)) void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line, const char* reason); void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line, const char* reason); -#else // !NDEBUG +void grpc_lb_policy_weak_ref(grpc_lb_policy* policy, const char* file, int line, + const char* reason); +void grpc_lb_policy_weak_unref(grpc_lb_policy* policy, const char* file, + int line, const char* reason); +#else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p)) +#define GRPC_LB_POLICY_WEAK_UNREF(p, r) grpc_lb_policy_weak_unref((p)) void grpc_lb_policy_ref(grpc_lb_policy* policy); void grpc_lb_policy_unref(grpc_lb_policy* policy); +void grpc_lb_policy_weak_ref(grpc_lb_policy* policy); +void grpc_lb_policy_weak_unref(grpc_lb_policy* policy); #endif /** called by concrete implementations to initialize the base struct */ @@ -133,24 +141,28 @@ void grpc_lb_policy_init(grpc_lb_policy* policy, const grpc_lb_policy_vtable* vtable, grpc_combiner* combiner); -/// Shuts down \a policy. -/// If \a new_policy is non-null, any pending picks will be restarted -/// on that policy; otherwise, they will be failed. -void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, - grpc_lb_policy* new_policy); +/** Finds an appropriate subchannel for a call, based on \a pick_args. + + \a target will be set to the selected subchannel, or NULL on failure + or when the LB policy decides to drop the call. -/** Finds an appropriate subchannel for a call, based on data in \a pick. - \a pick must remain alive until the pick is complete. + Upon success, \a user_data will be set to whatever opaque information + may need to be propagated from the LB policy, or NULL if not needed. + \a context will be populated with context to pass to the subchannel + call, if needed. If the pick succeeds and a result is known immediately, a non-zero - value will be returned. Otherwise, \a pick->on_complete will be invoked + value will be returned. Otherwise, \a on_complete will be invoked once the pick is complete with its error argument set to indicate success or failure. Any IO should be done under the \a interested_parties \a grpc_pollset_set in the \a grpc_lb_policy struct. */ int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick); + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, + void** user_data, grpc_closure* on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) against one of the connected subchannels managed by \a policy. */ @@ -158,11 +170,11 @@ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, grpc_closure* on_initiate, grpc_closure* on_ack); -/** Cancel picks for \a pick. +/** Cancel picks for \a target. The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5849ac9d2da..1317cdcf750 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -54,7 +54,7 @@ * operations in progress over the old RR instance. This is done by * decreasing the reference count on the old policy. The moment no more * references are held on the old RR policy, it'll be destroyed and \a - * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN + * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * state. At this point we can transition to a new RR instance safely, which * is done once again via \a rr_handover_locked(). * @@ -128,48 +128,187 @@ grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); -struct glb_lb_policy; +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static grpc_error* initial_metadata_add_lb_token( + grpc_metadata_batch* initial_metadata, + grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != nullptr); + GPR_ASSERT(!GRPC_MDISNULL(lb_token)); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} -namespace { +static void destroy_client_stats(void* arg) { + grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg); +} -/// Linked list of pending pick requests. It stores all information needed to -/// eventually call (Round Robin's) pick() on them. They mainly stay pending -/// waiting for the RR policy to be created. -/// -/// Note that when a pick is sent to the RR policy, we inject our own -/// on_complete callback, so that we can intercept the result before -/// invoking the original on_complete callback. This allows us to set the -/// LB token metadata and add client_stats to the call context. -/// See \a pending_pick_complete() for details. -struct pending_pick { - // Our on_complete closure and the original one. - grpc_closure on_complete; - grpc_closure* original_on_complete; - // The original pick. - grpc_lb_policy_pick_state* pick; - // Stats for client-side load reporting. Note that this holds a - // reference, which must be either passed on via context or unreffed. +typedef struct wrapped_rr_closure_arg { + /* the closure instance using this struct as argument */ + grpc_closure wrapper_closure; + + /* the original closure. Usually a on_complete/notify cb for pick() and ping() + * calls against the internal RR instance, respectively. */ + grpc_closure* wrapped_closure; + + /* the pick's initial metadata, kept in order to append the LB token for the + * pick */ + grpc_metadata_batch* initial_metadata; + + /* the picked target, used to determine which LB token to add to the pick's + * initial metadata */ + grpc_connected_subchannel** target; + + /* the context to be populated for the subchannel call */ + grpc_call_context_element* context; + + /* Stats for client-side load reporting. Note that this holds a + * reference, which must be either passed on via context or unreffed. */ grpc_grpclb_client_stats* client_stats; - // The LB token associated with the pick. This is set via user_data in - // the pick. + + /* the LB token associated with the pick */ grpc_mdelem lb_token; - // The grpclb instance that created the wrapping. This instance is not owned, - // reference counts are untouched. It's used only for logging purposes. - glb_lb_policy* glb_policy; - // Next pending pick. + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem* lb_token_mdelem_storage; + + /* The RR instance related to the closure */ + grpc_lb_policy* rr_policy; + + /* The grpclb instance that created the wrapping. This instance is not owned, + * reference counts are untouched. It's used only for logging purposes. */ + grpc_lb_policy* glb_policy; + + /* heap memory to be freed upon closure execution. */ + void* free_when_done; +} wrapped_rr_closure_arg; + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +static void wrapped_rr_closure(void* arg, grpc_error* error) { + wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg; + + GPR_ASSERT(wc_arg->wrapped_closure != nullptr); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + + if (wc_arg->rr_policy != nullptr) { + /* if *target is nullptr, no pick has been made by the RR policy (eg, all + * addresses failed to connect). There won't be any user_data/token + * available */ + if (*wc_arg->target != nullptr) { + if (!GRPC_MDISNULL(wc_arg->lb_token)) { + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + } else { + gpr_log( + GPR_ERROR, + "[grpclb %p] No LB token for connected subchannel pick %p (from RR " + "instance %p).", + wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy); + abort(); + } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != nullptr); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + } else { + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + } + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy, + wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure"); + } + GPR_ASSERT(wc_arg->free_when_done != nullptr); + gpr_free(wc_arg->free_when_done); +} + +namespace { +/* Linked list of pending pick requests. It stores all information needed to + * eventually call (Round Robin's) pick() on them. They mainly stay pending + * waiting for the RR policy to be created/updated. + * + * One particularity is the wrapping of the user-provided \a on_complete closure + * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in + * order to correctly unref the RR policy instance upon completion of the pick. + * See \a wrapped_rr_closure for details. */ +struct pending_pick { struct pending_pick* next; + + /* original pick()'s arguments */ + grpc_lb_policy_pick_args pick_args; + + /* output argument where to store the pick()ed connected subchannel, or + * nullptr upon error. */ + grpc_connected_subchannel** target; + + /* args for wrapped_on_complete */ + wrapped_rr_closure_arg wrapped_on_complete_arg; }; +} // namespace + +static void add_pending_pick(pending_pick** root, + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, + grpc_closure* on_complete) { + pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); + pp->next = *root; + pp->pick_args = *pick_args; + pp->target = target; + pp->wrapped_on_complete_arg.wrapped_closure = on_complete; + pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.context = context; + pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; + pp->wrapped_on_complete_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; + pp->wrapped_on_complete_arg.free_when_done = pp; + GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure, + wrapped_rr_closure, &pp->wrapped_on_complete_arg, + grpc_schedule_on_exec_ctx); + *root = pp; +} -/// A linked list of pending pings waiting for the RR policy to be created. -struct pending_ping { - grpc_closure* on_initiate; - grpc_closure* on_ack; +/* Same as the \a pending_pick struct but for ping operations */ +typedef struct pending_ping { struct pending_ping* next; -}; -} // namespace + /* args for sending the ping */ + wrapped_rr_closure_arg* on_initiate; + wrapped_rr_closure_arg* on_ack; +} pending_ping; + +static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, + grpc_closure* on_ack) { + pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); + if (on_initiate != nullptr) { + pping->on_initiate = + (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate)); + pping->on_initiate->wrapped_closure = on_initiate; + pping->on_initiate->free_when_done = pping->on_initiate; + GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure, + &pping->on_initiate, grpc_schedule_on_exec_ctx); + } + if (on_ack != nullptr) { + pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack)); + pping->on_ack->wrapped_closure = on_ack; + pping->on_ack->free_when_done = pping->on_ack; + GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure, + &pping->on_ack, grpc_schedule_on_exec_ctx); + } + pping->next = *root; + *root = pping; +} -struct glb_lb_policy { +/* + * glb_lb_policy + */ +typedef struct rr_connectivity_data rr_connectivity_data; + +typedef struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -194,9 +333,6 @@ struct glb_lb_policy { /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy* rr_policy; - grpc_closure on_rr_connectivity_changed; - grpc_connectivity_state rr_connectivity_state; - bool started_picking; /** our connectivity state tracker */ @@ -301,84 +437,14 @@ struct glb_lb_policy { grpc_closure client_load_report_closure; /* Client load report message payload. */ grpc_byte_buffer* client_load_report_payload; -}; - -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static grpc_error* initial_metadata_add_lb_token( - grpc_metadata_batch* initial_metadata, - grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - -static void destroy_client_stats(void* arg) { - grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg); -} +} glb_lb_policy; -static void pending_pick_set_metadata_and_context(pending_pick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the RR - * policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ - if (pp->pick->connected_subchannel != nullptr) { - if (!GRPC_MDISNULL(pp->lb_token)) { - initial_metadata_add_lb_token(pp->pick->initial_metadata, - &pp->pick->lb_token_mdelem_storage, - GRPC_MDELEM_REF(pp->lb_token)); - } else { - gpr_log(GPR_ERROR, - "[grpclb %p] No LB token for connected subchannel pick %p", - pp->glb_policy, pp->pick); - abort(); - } - // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(pp->client_stats != nullptr); - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = - pp->client_stats; - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = - destroy_client_stats; - } else { - grpc_grpclb_client_stats_unref(pp->client_stats); - } -} - -/* The \a on_complete closure passed as part of the pick requires keeping a - * reference to its associated round robin instance. We wrap this closure in - * order to unref the round robin instance upon its invocation */ -static void pending_pick_complete(void* arg, grpc_error* error) { - pending_pick* pp = (pending_pick*)arg; - pending_pick_set_metadata_and_context(pp); - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); - gpr_free(pp); -} - -static pending_pick* pending_pick_create(glb_lb_policy* glb_policy, - grpc_lb_policy_pick_state* pick) { - pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); - pp->pick = pick; - pp->glb_policy = glb_policy; - GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp, - grpc_schedule_on_exec_ctx); - pp->original_on_complete = pick->on_complete; - pp->pick->on_complete = &pp->on_complete; - return pp; -} - -static void pending_pick_add(pending_pick** root, pending_pick* new_pp) { - new_pp->next = *root; - *root = new_pp; -} - -static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate, - grpc_closure* on_ack) { - pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - pping->on_initiate = on_initiate; - pping->on_ack = on_ack; - pping->next = *root; - *root = pping; -} +/* Keeps track and reacts to changes in connectivity of the RR instance */ +struct rr_connectivity_data { + grpc_closure on_change; + grpc_connectivity_state state; + glb_lb_policy* glb_policy; +}; static bool is_server_valid(const grpc_grpclb_server* server, size_t idx, bool log) { @@ -491,6 +557,7 @@ static grpc_lb_addresses* process_serverlist_locked( gpr_free(uri); user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; } + grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, false /* is_balancer */, nullptr /* balancer_name */, user_data); @@ -531,6 +598,7 @@ static void update_lb_connectivity_status_locked( grpc_error* rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); + /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. * @@ -560,6 +628,7 @@ static void update_lb_connectivity_status_locked( * * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); + switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: @@ -570,6 +639,7 @@ static void update_lb_connectivity_status_locked( case GRPC_CHANNEL_READY: GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } + if (grpc_lb_glb_trace.enabled()) { gpr_log( GPR_INFO, @@ -587,8 +657,10 @@ static void update_lb_connectivity_status_locked( * cleanups this callback would otherwise be responsible for. * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ -static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, - bool force_async, pending_pick* pp) { +static bool pick_from_internal_rr_locked( + glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, + bool force_async, grpc_connected_subchannel** target, + wrapped_rr_closure_arg* wc_arg) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != nullptr) { // Look at the index into the serverlist to see if we should drop this call. @@ -598,36 +670,57 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, glb_policy->serverlist_index = 0; // Wrap-around. } if (server->drop) { + // Not using the RR policy, so unref it. + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, + wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); // Update client load reporting stats to indicate the number of // dropped calls. Note that we have to do this here instead of in // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - GPR_ASSERT(glb_policy->client_stats != nullptr); + GPR_ASSERT(wc_arg->client_stats != nullptr); grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, glb_policy->client_stats); + server->load_balance_token, wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); - gpr_free(pp); + GPR_ASSERT(wc_arg->wrapped_closure != nullptr); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); return false; } - gpr_free(pp); + gpr_free(wc_arg->free_when_done); return true; } } - // Set client_stats and user_data. - pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. - bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick); + const bool pick_done = grpc_lb_policy_pick_locked( + wc_arg->rr_policy, pick_args, target, wc_arg->context, + (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { - pending_pick_set_metadata_and_context(pp); + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy, + wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != nullptr); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; if (force_async) { - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); - pick_done = false; + GPR_ASSERT(wc_arg->wrapped_closure != nullptr); + GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; } - gpr_free(pp); + gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -669,7 +762,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) { gpr_free(args); } -static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error); +static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error); static void create_rr_locked(glb_lb_policy* glb_policy, grpc_lb_policy_args* args) { GPR_ASSERT(glb_policy->rr_policy == nullptr); @@ -691,46 +784,72 @@ static void create_rr_locked(glb_lb_policy* glb_policy, glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; - glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked( - glb_policy->rr_policy, &rr_state_error); + const grpc_connectivity_state rr_state = + grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, + &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked( - glb_policy, glb_policy->rr_connectivity_state, rr_state_error); + update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed, - on_rr_connectivity_changed_locked, glb_policy, + + /* Allocate the data for the tracking of the new RR policy's connectivity. + * It'll be deallocated in glb_rr_connectivity_changed() */ + rr_connectivity_data* rr_connectivity = + (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data)); + GRPC_CLOSURE_INIT(&rr_connectivity->on_change, + glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner)); + rr_connectivity->glb_policy = glb_policy; + rr_connectivity->state = rr_state; + /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change_locked( - glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->on_rr_connectivity_changed); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); + grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); - // Send pending picks to RR policy. + + /* Update picks and pings in wait */ pending_pick* pp; while ((pp = glb_policy->pending_picks)) { glb_policy->pending_picks = pp->next; + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); + pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; + pp->wrapped_on_complete_arg.client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending pick about to (async) PICK from RR %p", glb_policy, glb_policy->rr_policy); } - pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp); + pick_from_internal_rr_locked(glb_policy, &pp->pick_args, + true /* force_async */, pp->target, + &pp->wrapped_on_complete_arg); } - // Send pending pings to RR policy. + pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; + grpc_closure* on_initiate = nullptr; + grpc_closure* on_ack = nullptr; + if (pping->on_initiate != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_initiate->rr_policy = glb_policy->rr_policy; + on_initiate = &pping->on_initiate->wrapper_closure; + } + if (pping->on_ack != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_ack->rr_policy = glb_policy->rr_policy; + on_ack = &pping->on_ack->wrapper_closure; + } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate, - pping->on_ack); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); gpr_free(pping); } } @@ -756,28 +875,31 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) { lb_policy_args_destroy(args); } -static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; +static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) { + rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg; + glb_lb_policy* glb_policy = rr_connectivity->glb_policy; if (glb_policy->shutting_down) { - GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); + gpr_free(rr_connectivity); return; } - if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { /* An RR policy that has transitioned into the SHUTDOWN connectivity state * should not be considered for picks or updates: the SHUTDOWN state is a * sink, policies can't transition back from it. .*/ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); glb_policy->rr_policy = nullptr; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); + gpr_free(rr_connectivity); return; } /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ - update_lb_connectivity_status_locked( - glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */ - grpc_lb_policy_notify_on_state_change_locked( - glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->on_rr_connectivity_changed); + update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state, + GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ + grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); } static void destroy_balancer_name(void* balancer_name) { @@ -885,17 +1007,22 @@ static void glb_destroy(grpc_lb_policy* pol) { gpr_free(glb_policy); } -static void glb_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { +static void glb_shutdown_locked(grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; + + /* We need a copy of the lb_call pointer because we can't cancell the call + * while holding glb_policy->mu: lb_on_server_status_received, invoked due to + * the cancel, needs to acquire that same lock */ + grpc_call* lb_call = glb_policy->lb_call; + /* glb_policy->lb_call and this local lb_call must be consistent at this point * because glb_policy->lb_call is only assigned in lb_call_init_locked as part * of query_for_backends_locked, which can only be invoked while * glb_policy->shutting_down is false. */ - if (glb_policy->lb_call != nullptr) { - grpc_call_cancel(glb_policy->lb_call, nullptr); + if (lb_call != nullptr) { + grpc_call_cancel(lb_call, nullptr); /* lb_on_server_status_received will pick up the cancel and clean up */ } if (glb_policy->retry_timer_callback_pending) { @@ -904,8 +1031,12 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, if (glb_policy->fallback_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_fallback_timer); } + + pending_pick* pp = glb_policy->pending_picks; + glb_policy->pending_picks = nullptr; + pending_ping* pping = glb_policy->pending_pings; + glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); } else { grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); @@ -920,33 +1051,28 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "glb_shutdown"); - // Clear pending picks. - pending_pick* pp = glb_policy->pending_picks; - glb_policy->pending_picks = nullptr; + while (pp != nullptr) { pending_pick* next = pp->next; - if (new_policy != nullptr) { - // Hand pick over to new policy. - grpc_grpclb_client_stats_unref(pp->client_stats); - pp->pick->on_complete = pp->original_on_complete; - if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) { - // Synchronous return; schedule callback. - GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); - } - gpr_free(pp); - } else { - pp->pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); - } + *pp->target = nullptr; + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pp); pp = next; } - // Clear pending pings. - pending_ping* pping = glb_policy->pending_pings; - glb_policy->pending_pings = nullptr; + while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error)); - GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error)); + if (pping->on_initiate != nullptr) { + GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_initiate); + } + if (pping->on_ack != nullptr) { + GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_ack); + } gpr_free(pping); pping = next; } @@ -964,16 +1090,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to nullptr right here. static void glb_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; pending_pick* pp = glb_policy->pending_picks; glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - if (pp->pick == pick) { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(&pp->on_complete, + if (pp->target == target) { + *target = nullptr; + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -983,7 +1109,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, pp = next; } if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick, + grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); @@ -1008,9 +1134,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol, glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(&pp->on_complete, + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1036,7 +1162,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { !glb_policy->fallback_timer_callback_pending) { grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; - GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1058,9 +1184,19 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) { } static int glb_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, void** user_data, + grpc_closure* on_complete) { + if (pick_args->lb_token_mdelem_storage == nullptr) { + *target = nullptr; + GRPC_CLOSURE_SCHED(on_complete, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No mdelem storage for the LB token. Load reporting " + "won't work without it. Failing")); + return 0; + } glb_lb_policy* glb_policy = (glb_lb_policy*)pol; - pending_pick* pp = pending_pick_create(glb_policy, pick); bool pick_done = false; if (glb_policy->rr_policy != nullptr) { const grpc_connectivity_state rr_connectivity_state = @@ -1068,7 +1204,7 @@ static int glb_pick_locked(grpc_lb_policy* pol, nullptr); // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the // callback registered to capture this event - // (on_rr_connectivity_changed_locked) may not have been invoked yet. We + // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We // need to make sure we aren't trying to pick from a RR policy instance // that's in shutdown. if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { @@ -1078,16 +1214,32 @@ static int glb_pick_locked(grpc_lb_policy* pol, glb_policy, glb_policy->rr_policy, grpc_connectivity_state_name(rr_connectivity_state)); } - pending_pick_add(&glb_policy->pending_picks, pp); + add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, + on_complete); pick_done = false; } else { // RR not in shutdown if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, glb_policy->rr_policy); } + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); + wrapped_rr_closure_arg* wc_arg = + (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg)); + GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, + grpc_schedule_on_exec_ctx); + wc_arg->rr_policy = glb_policy->rr_policy; + wc_arg->target = target; + wc_arg->context = context; GPR_ASSERT(glb_policy->client_stats != nullptr); - pick_done = - pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp); + wc_arg->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); + wc_arg->wrapped_closure = on_complete; + wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + wc_arg->initial_metadata = pick_args->initial_metadata; + wc_arg->free_when_done = wc_arg; + wc_arg->glb_policy = pol; + pick_done = pick_from_internal_rr_locked( + glb_policy, pick_args, false /* force_async */, target, wc_arg); } } else { // glb_policy->rr_policy == NULL if (grpc_lb_glb_trace.enabled()) { @@ -1095,7 +1247,8 @@ static int glb_pick_locked(grpc_lb_policy* pol, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", glb_policy); } - pending_pick_add(&glb_policy->pending_picks, pp); + add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, + on_complete); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } @@ -1117,7 +1270,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (glb_policy->rr_policy) { grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); } else { - pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack); + add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } @@ -1142,7 +1295,7 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { } query_for_backends_locked(glb_policy); } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer"); } static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { @@ -1168,7 +1321,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { glb_policy); } } - GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1176,8 +1329,8 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_server_status_received_locked"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, + "lb_on_server_status_received_locked"); } static void send_client_load_report_locked(void* arg, grpc_error* error); @@ -1200,7 +1353,7 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) { glb_policy->client_load_report_payload = nullptr; if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { glb_policy->client_load_report_timer_callback_pending = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { maybe_restart_lb_call(glb_policy); } @@ -1241,7 +1394,7 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { glb_policy->client_load_report_timer_callback_pending = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { maybe_restart_lb_call(glb_policy); } @@ -1394,8 +1547,10 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be released in lb_on_sent_initial_request_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_sent_initial_request_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_sent_initial_request); @@ -1411,8 +1566,10 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be released in lb_on_server_status_received_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked"); + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_server_status_received_locked */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1424,8 +1581,9 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be unref'd/reused in lb_on_response_received_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked"); + /* take another weak ref to be unref'd/reused in + * lb_on_response_received_locked */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); @@ -1440,7 +1598,8 @@ static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { if (glb_policy->client_load_report_payload != nullptr) { do_send_client_load_report_locked(glb_policy); } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, + "lb_on_sent_initial_request_locked"); } static void lb_on_response_received_locked(void* arg, grpc_error* error) { @@ -1472,9 +1631,11 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { "client load reporting interval = %" PRIdPTR " milliseconds", glb_policy, glb_policy->client_stats_report_interval); } - /* take a ref to be unref'd in send_client_load_report_locked() */ + /* take a weak ref (won't prevent calling of \a glb_shutdown() if the + * strong ref count goes to zero) to be unref'd in + * send_client_load_report_locked() */ glb_policy->client_load_report_timer_callback_pending = true; - GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); schedule_next_client_load_report(glb_policy); } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, @@ -1556,21 +1717,21 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { op->flags = 0; op->reserved = nullptr; op++; - /* reuse the "lb_on_response_received_locked" ref taken in + /* reuse the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } else { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_response_received_locked_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, + "lb_on_response_received_locked_shutdown"); } } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received_locked" ref taken in + /* dispose of the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_response_received_locked_empty_payload"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, + "lb_on_response_received_locked_empty_payload"); } } @@ -1590,7 +1751,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { rr_handover_locked(glb_policy); } } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { @@ -1674,7 +1835,7 @@ static void glb_update_locked(grpc_lb_policy* policy, grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); glb_policy->watching_lb_channel = true; - GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); grpc_client_channel_watch_connectivity_state( client_channel_elem, grpc_polling_entity_create_from_pollset_set( @@ -1730,8 +1891,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "watch_lb_channel_connectivity_cb_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, + "watch_lb_channel_connectivity_cb_shutdown"); break; } } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 60385272cf5..9ff40aa53c0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -31,6 +31,15 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); +namespace { +struct pending_pick { + struct pending_pick* next; + uint32_t initial_metadata_flags; + grpc_connected_subchannel** target; + grpc_closure* on_complete; +}; +} // namespace + typedef struct { /** base policy: must be first */ grpc_lb_policy base; @@ -45,7 +54,7 @@ typedef struct { /** are we shut down? */ bool shutdown; /** list of picks that are waiting on connectivity */ - grpc_lb_policy_pick_state* pending_picks; + pending_pick* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; @@ -63,27 +72,19 @@ static void pf_destroy(grpc_lb_policy* pol) { } } -static void pf_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { +static void pf_shutdown_locked(grpc_lb_policy* pol) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } p->shutdown = true; - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks) != nullptr) { - p->pending_picks = pick->next; - if (new_policy != nullptr) { - // Hand off to new LB policy. - if (grpc_lb_policy_pick_locked(new_policy, pick)) { - // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } - } else { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); - } + pending_pick* pp; + while ((pp = p->pending_picks) != nullptr) { + p->pending_picks = pp->next; + *pp->target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); + gpr_free(pp); } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); @@ -103,18 +104,19 @@ static void pf_shutdown_locked(grpc_lb_policy* pol, } static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - grpc_lb_policy_pick_state* pp = p->pending_picks; + pending_pick* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - grpc_lb_policy_pick_state* next = pp->next; - if (pp == pick) { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, + pending_pick* next = pp->next; + if (pp->target == target) { + *target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); + gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -129,20 +131,21 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - grpc_lb_policy_pick_state* pick = p->pending_picks; + pending_pick* pp = p->pending_picks; p->pending_picks = nullptr; - while (pick != nullptr) { - grpc_lb_policy_pick_state* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + while (pp != nullptr) { + pending_pick* next = pp->next; + if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(pick->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); + gpr_free(pp); } else { - pick->next = p->pending_picks; - p->pending_picks = pick; + pp->next = p->pending_picks; + p->pending_picks = pp; } - pick = next; + pp = next; } GRPC_ERROR_UNREF(error); } @@ -172,20 +175,27 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) { } static int pf_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, void** user_data, + grpc_closure* on_complete) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, + "picked"); return 1; } // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(p); } - pick->next = p->pending_picks; - p->pending_picks = pick; + pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->on_complete = on_complete; + p->pending_picks = pp; return 0; } @@ -471,17 +481,18 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Drop all other subchannels, since we are now connected. destroy_unselected_subchannels_locked(p); // Update any calls that were waiting for a pick. - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks)) { - p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + pending_pick* pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( p->selected->connected_subchannel, "picked"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", (void*)p->selected); } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 92c7d5bd5d0..a964af06270 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -41,6 +41,31 @@ grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); +namespace { +/** List of entities waiting for a pick. + * + * Once a pick is available, \a target is updated and \a on_complete called. */ +struct pending_pick { + pending_pick* next; + + /* output argument where to store the pick()ed user_data. It'll be NULL if no + * such data is present or there's an error (the definite test for errors is + * \a target being NULL). */ + void** user_data; + + /* bitmask passed to pick() and used for selective cancelling. See + * grpc_lb_policy_cancel_picks() */ + uint32_t initial_metadata_flags; + + /* output argument where to store the pick()ed connected subchannel, or NULL + * upon error. */ + grpc_connected_subchannel** target; + + /* to be invoked once the pick() has completed (regardless of success) */ + grpc_closure* on_complete; +}; +} // namespace + typedef struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -52,7 +77,7 @@ typedef struct round_robin_lb_policy { /** are we shutting down? */ bool shutdown; /** List of picks that are waiting on connectivity */ - grpc_lb_policy_pick_state* pending_picks; + pending_pick* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; @@ -144,27 +169,19 @@ static void rr_destroy(grpc_lb_policy* pol) { gpr_free(p); } -static void rr_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { +static void rr_shutdown_locked(grpc_lb_policy* pol) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); } p->shutdown = true; - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks) != nullptr) { - p->pending_picks = pick->next; - if (new_policy != nullptr) { - // Hand off to new LB policy. - if (grpc_lb_policy_pick_locked(new_policy, pick)) { - // Synchronous return; schedule callback. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } - } else { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); - } + pending_pick* pp; + while ((pp = p->pending_picks) != nullptr) { + p->pending_picks = pp->next; + *pp->target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); + gpr_free(pp); } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); @@ -184,18 +201,19 @@ static void rr_shutdown_locked(grpc_lb_policy* pol, } static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - grpc_lb_policy_pick_state* pp = p->pending_picks; + pending_pick* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - grpc_lb_policy_pick_state* next = pp->next; - if (pp == pick) { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, + pending_pick* next = pp->next; + if (pp->target == target) { + *target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); + gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -210,21 +228,22 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - grpc_lb_policy_pick_state* pick = p->pending_picks; + pending_pick* pp = p->pending_picks; p->pending_picks = nullptr; - while (pick != nullptr) { - grpc_lb_policy_pick_state* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + while (pp != nullptr) { + pending_pick* next = pp->next; + if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, + *pp->target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); + gpr_free(pp); } else { - pick->next = p->pending_picks; - p->pending_picks = pick; + pp->next = p->pending_picks; + p->pending_picks = pp; } - pick = next; + pp = next; } GRPC_ERROR_UNREF(error); } @@ -249,10 +268,13 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) { } static int rr_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, void** user_data, + grpc_closure* on_complete) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol, + gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol, p->shutdown); } GPR_ASSERT(!p->shutdown); @@ -262,18 +284,18 @@ static int rr_pick_locked(grpc_lb_policy* pol, /* readily available, report right away */ grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[next_ready_index]; - pick->connected_subchannel = + *target = GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); - if (pick->user_data != nullptr) { - *pick->user_data = sd->user_data; + if (user_data != nullptr) { + *user_data = sd->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " - "index %" PRIuPTR ")", - p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list, - next_ready_index); + "index %lu)", + (void*)p, (void*)sd->subchannel, (void*)*target, + (void*)sd->subchannel_list, (unsigned long)next_ready_index); } /* only advance the last picked pointer if the selection was used */ update_last_ready_subchannel_index_locked(p, next_ready_index); @@ -284,8 +306,13 @@ static int rr_pick_locked(grpc_lb_policy* pol, if (!p->started_picking) { start_picking_locked(p); } - pick->next = p->pending_picks; - p->pending_picks = pick; + pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->on_complete = on_complete; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->user_data = user_data; + p->pending_picks = pp; return 0; } @@ -468,13 +495,13 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // picks, update the last picked pointer update_last_ready_subchannel_index_locked(p, next_ready_index); } - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks)) { - p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + pending_pick* pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_picked"); - if (pick->user_data != nullptr) { - *pick->user_data = selected->user_data; + if (pp->user_data != nullptr) { + *pp->user_data = selected->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, @@ -483,7 +510,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { (void*)p, (void*)selected->subchannel, (void*)p->subchannel_list, (unsigned long)next_ready_index); } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); } } // Renew notification. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 5ce1298afc4..a3b4c8e524d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -213,13 +213,13 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, void grpc_lb_subchannel_list_ref_for_connectivity_watch( grpc_lb_subchannel_list* subchannel_list, const char* reason) { - GRPC_LB_POLICY_REF(subchannel_list->policy, reason); + GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); grpc_lb_subchannel_list_ref(subchannel_list, reason); } void grpc_lb_subchannel_list_unref_for_connectivity_watch( grpc_lb_subchannel_list* subchannel_list, const char* reason) { - GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason); + GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason); grpc_lb_subchannel_list_unref(subchannel_list, reason); }