From 53bfe69f707e3729cd5845091a1282771b7e45ee Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 4 Jan 2018 16:39:08 -0800 Subject: [PATCH 1/7] Connected subchannel refactoring --- BUILD | 9 +- .../filters/client_channel/client_channel.cc | 6 +- .../ext/filters/client_channel/lb_policy.h | 2 +- .../lb_policy/pick_first/pick_first.cc | 25 +- .../lb_policy/round_robin/round_robin.cc | 15 +- .../ext/filters/client_channel/subchannel.cc | 250 ++++++++++-------- .../ext/filters/client_channel/subchannel.h | 56 ++-- src/core/lib/debug/trace.h | 5 +- src/core/lib/support/ref_counted.h | 4 +- test/cpp/end2end/client_lb_end2end_test.cc | 36 +++ 10 files changed, 241 insertions(+), 167 deletions(-) diff --git a/BUILD b/BUILD index dba6592f172..290f4c56213 100644 --- a/BUILD +++ b/BUILD @@ -544,24 +544,24 @@ grpc_cc_library( grpc_cc_library( name = "debug_location", - public_hdrs = ["src/core/lib/support/debug_location.h"], language = "c++", + public_hdrs = ["src/core/lib/support/debug_location.h"], ) grpc_cc_library( name = "ref_counted", - public_hdrs = ["src/core/lib/support/ref_counted.h"], language = "c++", + public_hdrs = ["src/core/lib/support/ref_counted.h"], deps = [ - "grpc_trace", "debug_location", + "grpc_trace", ], ) grpc_cc_library( name = "ref_counted_ptr", - public_hdrs = ["src/core/lib/support/ref_counted_ptr.h"], language = "c++", + public_hdrs = ["src/core/lib/support/ref_counted_ptr.h"], ) grpc_cc_library( @@ -919,6 +919,7 @@ grpc_cc_library( deps = [ "grpc_base", "grpc_deadline_filter", + "ref_counted", ], ) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e99022a91b8..4f3b774212f 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1004,7 +1004,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - const grpc_connected_subchannel_call_args call_args = { + const grpc_connected_subchannel::CallArgs call_args = { calld->pollent, // pollent calld->path, // path calld->call_start_time, // start_time @@ -1013,8 +1013,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem, calld->subchannel_call_context, // context calld->call_combiner // call_combiner }; - grpc_error* new_error = grpc_connected_subchannel_create_call( - calld->connected_subchannel, &call_args, &calld->subchannel_call); + grpc_error* new_error = calld->connected_subchannel->CreateCall( + &call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 3572c97ed14..628127106b1 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -164,7 +164,7 @@ int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete); -/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) +/** Perform a connected subchannel ping (see \a grpc_connected_subchannel::Ping) against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, grpc_closure* on_initiate, diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 0861261359f..a3b05aacaf6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -225,8 +225,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - grpc_connected_subchannel_ping(p->selected->connected_subchannel, - on_initiate, on_ack); + p->selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -413,6 +412,18 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { + if (sd->curr_connectivity_state < GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); + } else { // in transient failure or shutdown. Rely on re-resolution to + // recover. + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel( + sd, "pf_selected_shutdown"); // Unrefs connected subchannel + } // TODO(juanlishen): we re-resolve when the selected subchannel goes to // TRANSIENT_FAILURE because we used to shut down in this case before // re-resolution is introduced. But we need to investigate whether we @@ -432,16 +443,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); } - if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } else { - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); - } } return; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b0c84017dfb..0836dad2f6b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -442,8 +442,19 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // Update state counters and new overall state. update_state_counters_locked(sd); update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* + // subchannel, if any. + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->connected_subchannel != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, + "connected_subchannel_transient_failure"); + sd->connected_subchannel = nullptr; + } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); + } // If the sd's new state is SHUTDOWN, unref the subchannel. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch( @@ -540,7 +551,7 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(target, on_initiate, on_ack); + target->Ping(on_initiate, on_ack); GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index f07394d29b2..25615b6326f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/debug_location.h" #include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" @@ -96,7 +97,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_closure connected; + grpc_closure on_connected; /** callback for our alarm */ grpc_closure on_alarm; @@ -139,11 +140,10 @@ struct grpc_subchannel_call { }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call*)(callstack)) - 1) -static void subchannel_connected(void* subchannel, grpc_error* error); +static void on_subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason @@ -161,20 +161,20 @@ static void subchannel_connected(void* subchannel, grpc_error* error); */ static void connection_destroy(void* arg, grpc_error* error) { - grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); - gpr_free(c); + grpc_channel_stack* stk = (grpc_channel_stack*)arg; + grpc_channel_stack_destroy(stk); + gpr_free(stk); } grpc_connected_subchannel* grpc_connected_subchannel_ref( grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + c->Ref(DEBUG_LOCATION, REF_REASON); return c; } void grpc_connected_subchannel_unref( grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + c->Unref(DEBUG_LOCATION, REF_REASON); } /* @@ -241,16 +241,15 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref( } static void disconnect(grpc_subchannel* c) { - grpc_connected_subchannel* con; grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + grpc_connected_subchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "disconnect"); gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); } gpr_mu_unlock(&c->mu); @@ -372,7 +371,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, + GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -395,7 +394,7 @@ static void continue_connect_locked(grpc_subchannel* c) { grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected); + &c->on_connected); } grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, @@ -479,9 +478,10 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { const grpc_millis time_til_next = c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Retry immediately"); + gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); } else { - gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); + gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c, + time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); @@ -525,75 +525,56 @@ void grpc_subchannel_notify_on_state_change( } } -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* con, grpc_transport_op* op) { - grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op); -} - -static void subchannel_on_child_state_changed(void* p, grpc_error* error) { - state_watcher* sw = (state_watcher*)p; - grpc_subchannel* c = sw->subchannel; +static void on_connected_subchannel_connectivity_changed(void* p, + grpc_error* error) { + state_watcher* connected_subchannel_watcher = (state_watcher*)p; + grpc_subchannel* c = connected_subchannel_watcher->subchannel; gpr_mu* mu = &c->mu; gpr_mu_lock(mu); + auto* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); /* if we failed just leave this closure */ - if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* any errors on a subchannel ==> we're done, create a new one */ - sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; + if (connected_subchannel_watcher->connectivity_state == + GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (!c->disconnected && con != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "transient_failure"); + gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm) nullptr); + grpc_connectivity_state_set(&c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "reflect_child"); + c->backoff_begun = false; + c->backoff->Reset(); + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into " + "TRANSIENT_FAILURE. Attempting to reconnect.", + con, c); + } + maybe_start_connecting_locked(c); + goto done; + } else { + connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_SHUTDOWN; + } } - grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state, + grpc_connectivity_state_set(&c->state_tracker, + connected_subchannel_watcher->connectivity_state, GRPC_ERROR_REF(error), "reflect_child"); - if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, - &sw->connectivity_state, &sw->closure); + if (connected_subchannel_watcher->connectivity_state < + GRPC_CHANNEL_TRANSIENT_FAILURE) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - sw = nullptr; + con->NotifyOnStateChange(nullptr, + &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); + connected_subchannel_watcher = nullptr; } - +done: gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); - gpr_free(sw); -} - -static void connected_subchannel_state_op(grpc_connected_subchannel* con, - grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); -} - -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* closure) { - connected_subchannel_state_op(con, interested_parties, state, closure); -} - -void grpc_connected_subchannel_ping(grpc_connected_subchannel* con, - grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); + gpr_free(connected_subchannel_watcher); } static bool publish_transport_locked(grpc_subchannel* c) { - grpc_connected_subchannel* con; - grpc_channel_stack* stk; - state_watcher* sw_subchannel; - /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( @@ -605,8 +586,9 @@ static bool publish_transport_locked(grpc_subchannel* c) { grpc_channel_stack_builder_destroy(builder); return false; } + grpc_channel_stack* stk; grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, connection_destroy, nullptr, (void**)&con); + builder, 0, 1, connection_destroy, nullptr, (void**)&stk); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", @@ -614,20 +596,21 @@ static bool publish_transport_locked(grpc_subchannel* c) { GRPC_ERROR_UNREF(error); return false; } - stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->subchannel = c; - sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, - sw_subchannel, grpc_schedule_on_exec_ctx); + state_watcher* connected_subchannel_watcher = + (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher)); + connected_subchannel_watcher->subchannel = c; + connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; + GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, + on_connected_subchannel_connectivity_changed, + connected_subchannel_watcher, grpc_schedule_on_exec_ctx); if (c->disconnected) { - gpr_free(sw_subchannel); + gpr_free(connected_subchannel_watcher); grpc_channel_stack_destroy(stk); - gpr_free(con); + gpr_free(stk); return false; } @@ -636,6 +619,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { I'd have expected the rel_cas below to be enough, but seemingly it's not. Re-evaluate if we really need this. */ + grpc_connected_subchannel* con = + grpc_core::New(stk); gpr_atm_full_barrier(); GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); @@ -643,9 +628,9 @@ static bool publish_transport_locked(grpc_subchannel* c) { ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - grpc_connected_subchannel_notify_on_state_change( - con, c->pollset_set, &sw_subchannel->connectivity_state, - &sw_subchannel->closure); + con->NotifyOnStateChange(c->pollset_set, + &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); /* signal completion */ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, @@ -653,11 +638,11 @@ static bool publish_transport_locked(grpc_subchannel* c) { return true; } -static void subchannel_connected(void* arg, grpc_error* error) { +static void on_subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && @@ -736,36 +721,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key( return subchannel->key; } -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* con, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** call) { - grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = (grpc_subchannel_call*)gpr_arena_alloc( - args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args->context, /* context */ - args->path, /* path */ - args->start_time, /* start_time */ - args->deadline, /* deadline */ - args->arena, /* arena */ - args->call_combiner /* call_combiner */ - }; - grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy, - *call, &call_args); - if (error != GRPC_ERROR_NONE) { - const char* error_string = grpc_error_string(error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return error; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); - return GRPC_ERROR_NONE; -} - grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); @@ -801,3 +756,70 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } + +grpc_connected_subchannel::grpc_connected_subchannel( + grpc_channel_stack* channel_stack) + : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), + channel_stack_(channel_stack) {} + +grpc_connected_subchannel* grpc_connected_subchannel::Ref( + const grpc_core::DebugLocation& location, const char* reason) { + GRPC_CHANNEL_STACK_REF(channel_stack_, REF_REASON); + grpc_core::RefCountedWithTracing::Ref(location, reason); + return this; +} +void grpc_connected_subchannel::Unref(const grpc_core::DebugLocation& location, + const char* reason) { + GRPC_CHANNEL_STACK_UNREF(channel_stack_, REF_REASON); + grpc_core::RefCountedWithTracing::Unref(location, reason); +} + +void grpc_connected_subchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +void grpc_connected_subchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args, + grpc_subchannel_call** call) { + *call = (grpc_subchannel_call*)gpr_arena_alloc( + args->arena, + sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + (*call)->connection = Ref(DEBUG_LOCATION, "subchannel_call"); + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args->context, /* context */ + args->path, /* path */ + args->start_time, /* start_time */ + args->deadline, /* deadline */ + args->arena, /* arena */ + args->call_combiner /* call_combiner */ + }; + grpc_error* error = grpc_call_stack_init( + channel_stack_, 1, subchannel_call_destroy, *call, &call_args); + if (error != GRPC_ERROR_NONE) { + const char* error_string = grpc_error_string(error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return error; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); + return GRPC_ERROR_NONE; +} diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 9d34fff07a8..d9a850daaea 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,6 +23,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/support/arena.h" +#include "src/core/lib/support/ref_counted.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -32,7 +33,6 @@ /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; -typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_key grpc_subchannel_key; @@ -73,6 +73,34 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif +class grpc_connected_subchannel : public grpc_core::RefCountedWithTracing { + public: + struct CallArgs { + grpc_polling_entity* pollent; + grpc_slice path; + gpr_timespec start_time; + grpc_millis deadline; + gpr_arena* arena; + grpc_call_context_element* context; + grpc_call_combiner* call_combiner; + }; + + grpc_connected_subchannel(grpc_channel_stack* channel_stack); + grpc_connected_subchannel* Ref(const grpc_core::DebugLocation& location, + const char* reason); + void Unref(const grpc_core::DebugLocation& location, const char* reason); + grpc_channel_stack* channel_stack() { return channel_stack_; } + void NotifyOnStateChange(grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* closure); + void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); + + grpc_error* CreateCall(const CallArgs* args, grpc_subchannel_call** call); + + private: + grpc_channel_stack* channel_stack_; +}; + grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* grpc_subchannel_ref_from_weak_ref( @@ -92,26 +120,6 @@ void grpc_subchannel_call_ref( void grpc_subchannel_call_unref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a subchannel call */ -typedef struct { - grpc_polling_entity* pollent; - grpc_slice path; - gpr_timespec start_time; - grpc_millis deadline; - gpr_arena* arena; - grpc_call_context_element* context; - grpc_call_combiner* call_combiner; -} grpc_connected_subchannel_call_args; - -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* connected_subchannel, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** subchannel_call); - -/** process a transport level op */ -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* subchannel, grpc_transport_op* op); - /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel* channel, grpc_error** error); @@ -121,12 +129,6 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( void grpc_subchannel_notify_on_state_change( grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify); -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify); -void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel, - grpc_closure* on_initiate, - grpc_closure* on_ack); /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h index 69ddd802222..6ee1b49f747 100644 --- a/src/core/lib/debug/trace.h +++ b/src/core/lib/debug/trace.h @@ -88,9 +88,10 @@ class TraceFlag { #ifndef NDEBUG typedef TraceFlag DebugOnlyTraceFlag; #else -class DebugOnlyTraceFlag { +class DebugOnlyTraceFlag : public TraceFlag { public: - DebugOnlyTraceFlag(bool default_enabled, const char* name) {} + DebugOnlyTraceFlag(bool default_enabled, const char* name) + : TraceFlag(default_enabled, name) {} bool enabled() { return false; } private: diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h index 4c662f91190..f2182baea19 100644 --- a/src/core/lib/support/ref_counted.h +++ b/src/core/lib/support/ref_counted.h @@ -23,6 +23,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/abstract.h" #include "src/core/lib/support/debug_location.h" #include "src/core/lib/support/memory.h" @@ -97,6 +98,7 @@ class RefCountedWithTracing { // Not copyable nor movable. RefCountedWithTracing(const RefCountedWithTracing&) = delete; RefCountedWithTracing& operator=(const RefCountedWithTracing&) = delete; + GRPC_ABSTRACT_BASE_CLASS protected: // Allow Delete() to access destructor. @@ -110,8 +112,6 @@ class RefCountedWithTracing { gpr_ref_init(&refs_, 1); } - virtual ~RefCountedWithTracing() {} - private: TraceFlag* trace_flag_ = nullptr; gpr_refcount refs_; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c6e9577f0ce..5a7e52e9e94 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -673,6 +673,42 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { GPR_ASSERT(gpr_time_cmp(deadline, now) > 0); } +TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { + const int kNumServers = 3; + StartServers(kNumServers); + const auto ports = GetServersPorts(); + ResetStub(ports, "round_robin"); + SetNextResolution(ports); + for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i); + for (size_t i = 0; i < servers_.size(); ++i) { + CheckRpcSendOk(); + EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; + } + // One request should have gone to each server. + for (size_t i = 0; i < servers_.size(); ++i) { + EXPECT_EQ(1, servers_[i]->service_.request_count()); + } + const auto pre_death = servers_[0]->service_.request_count(); + // Kill the first server. + servers_[0]->Shutdown(true); + // Client request still succeed. May need retrying if RR had returned a pick + // before noticing the change in the server's connectivity. + while (!SendRpc()) + ; // Retry until success. + // Send a bunch of RPCs that should succeed. + for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk(); + const auto post_death = servers_[0]->service_.request_count(); + // No requests have gone to the deceased server. + EXPECT_EQ(pre_death, post_death); + // Bring the first server back up. + servers_[0].reset(new ServerData(server_host_, ports[0])); + // Requests should start arriving at the first server either right away (if + // the server managed to start before the RR policy retried the subchannel) or + // after the subchannel retry delay otherwise (RR's subchannel retried before + // the server was fully back up). + WaitForServer(0); +} + } // namespace } // namespace testing } // namespace grpc From baf1ac7af91eab2da6024b05ddb83720d9644b94 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 9 Jan 2018 14:24:32 -0800 Subject: [PATCH 2/7] PR comments --- .../filters/client_channel/client_channel.cc | 6 +- .../ext/filters/client_channel/lb_policy.cc | 4 +- .../ext/filters/client_channel/lb_policy.h | 11 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 12 +- .../lb_policy/pick_first/pick_first.cc | 84 +++++-------- .../lb_policy/round_robin/round_robin.cc | 50 ++++---- .../lb_policy/subchannel_list.h | 2 +- .../ext/filters/client_channel/subchannel.cc | 115 +++++++++--------- .../ext/filters/client_channel/subchannel.h | 35 +++--- src/core/lib/support/ref_counted.h | 3 + 10 files changed, 149 insertions(+), 173 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 4f3b774212f..fce5f3582b6 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -856,7 +856,7 @@ typedef struct client_channel_call_data { grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; - grpc_connected_subchannel* connected_subchannel; + grpc_core::ConnectedSubchannel* connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity* pollent; @@ -1004,7 +1004,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - const grpc_connected_subchannel::CallArgs call_args = { + const grpc_core::ConnectedSubchannel::CallArgs call_args = { calld->pollent, // pollent calld->path, // path calld->call_start_time, // start_time @@ -1014,7 +1014,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, calld->call_combiner // call_combiner }; grpc_error* new_error = calld->connected_subchannel->CreateCall( - &call_args, &calld->subchannel_call); + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 7a5a8dec34b..ebaeaadfc58 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -102,7 +102,7 @@ void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { return policy->vtable->pick_locked(policy, pick_args, target, context, @@ -110,7 +110,7 @@ int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, } void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { policy->vtable->cancel_pick_locked(policy, target, error); } diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 628127106b1..967253418e8 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -61,13 +61,13 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_pick */ int (*pick_locked)(grpc_lb_policy* policy, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete); /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick_locked)(grpc_lb_policy* policy, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error); /** \see grpc_lb_policy_cancel_picks */ @@ -160,11 +160,12 @@ void grpc_lb_policy_init(grpc_lb_policy* policy, in the \a grpc_lb_policy struct. */ int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete); -/** Perform a connected subchannel ping (see \a grpc_connected_subchannel::Ping) +/** Perform a connected subchannel ping (see \a + grpc_core::ConnectedSubchannel::Ping) against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, grpc_closure* on_initiate, @@ -174,7 +175,7 @@ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ba4e90d4c21..ebc7fdac4c8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -157,7 +157,7 @@ typedef struct wrapped_rr_closure_arg { /* the picked target, used to determine which LB token to add to the pick's * initial metadata */ - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; /* the context to be populated for the subchannel call */ grpc_call_context_element* context; @@ -242,7 +242,7 @@ typedef struct pending_pick { /* output argument where to store the pick()ed connected subchannel, or * nullptr upon error. */ - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; /* args for wrapped_on_complete */ wrapped_rr_closure_arg wrapped_on_complete_arg; @@ -250,7 +250,7 @@ typedef struct pending_pick { static void add_pending_pick(pending_pick** root, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, grpc_closure* on_complete) { pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); @@ -657,7 +657,7 @@ static void update_lb_connectivity_status_locked( * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, - bool force_async, grpc_connected_subchannel** target, + bool force_async, grpc_core::ConnectedSubchannel** target, wrapped_rr_closure_arg* wc_arg) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != nullptr) { @@ -1090,7 +1090,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to nullptr right here. static void glb_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; pending_pick* pp = glb_policy->pending_picks; @@ -1184,7 +1184,7 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) { static int glb_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { if (pick_args->lb_token_mdelem_storage == nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index a3b05aacaf6..e70f2a8c52b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -34,7 +34,7 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); typedef struct pending_pick { struct pending_pick* next; uint32_t initial_metadata_flags; - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; grpc_closure* on_complete; } pending_pick; @@ -102,7 +102,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) { } static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pending_pick* pp = p->pending_picks; @@ -174,7 +174,7 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) { static int pf_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; @@ -396,6 +396,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Handle updates for the currently selected subchannel. if (p->selected == sd) { + gpr_log(GPR_INFO, "BAR selected. subchannel %p, conn subchannel %p", + sd->subchannel, p->selected->connected_subchannel); // If the new state is anything other than READY and there is a // pending update, switch to the pending update. if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && @@ -412,25 +414,13 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - if (sd->curr_connectivity_state < GRPC_CHANNEL_TRANSIENT_FAILURE) { - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } else { // in transient failure or shutdown. Rely on re-resolution to - // recover. - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel( - sd, "pf_selected_shutdown"); // Unrefs connected subchannel - } // TODO(juanlishen): we re-resolve when the selected subchannel goes to // TRANSIENT_FAILURE because we used to shut down in this case before // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || - sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -438,10 +428,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + // in transient failure. Rely on re-resolution to recover. + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel( + sd, "pf_selected_shutdown"); // Unrefs connected subchannel + } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } } return; @@ -459,6 +459,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. + grpc_core::ConnectedSubchannel* con = + grpc_subchannel_get_connected_subchannel(sd->subchannel); + if (con == nullptr) { + // The subchannel may have become disconnected by the time this callback + // is invoked. Simply ignore and resubscribe: ulterior connectivity + // states + // must be in the pipeline and will eventually be invoked. + grpc_lb_subchannel_data_start_connectivity_watch(sd); + break; + } if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -469,9 +479,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); + sd->connected_subchannel = + GRPC_CONNECTED_SUBCHANNEL_REF(con, "connected"); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -530,39 +539,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_exhausted_subchannels"); - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_NONE); - } - } else { - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - } + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(break); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 0836dad2f6b..a6a8fbb3cfe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -58,7 +58,7 @@ typedef struct pending_pick { /* output argument where to store the pick()ed connected subchannel, or NULL * upon error. */ - grpc_connected_subchannel** target; + grpc_core::ConnectedSubchannel** target; /* to be invoked once the pick() has completed (regardless of success) */ grpc_closure* on_complete; @@ -199,7 +199,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) { } static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_error* error) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; pending_pick* pp = p->pending_picks; @@ -267,7 +267,7 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) { static int rr_pick_locked(grpc_lb_policy* pol, const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, + grpc_core::ConnectedSubchannel** target, grpc_call_context_element* context, void** user_data, grpc_closure* on_complete) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; @@ -316,15 +316,14 @@ static int rr_pick_locked(grpc_lb_policy* pol, static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; + GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(subchannel_list->num_transient_failures > 0); --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - GPR_ASSERT(subchannel_list->num_shutdown > 0); - --subchannel_list->num_shutdown; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; @@ -334,8 +333,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - ++subchannel_list->num_shutdown; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { ++subchannel_list->num_idle; } @@ -401,6 +398,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); round_robin_lb_policy* p = (round_robin_lb_policy*)sd->subchannel_list->policy; if (grpc_lb_round_robin_trace.enabled()) { @@ -444,23 +442,16 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, - "connected_subchannel_transient_failure"); - sd->connected_subchannel = nullptr; + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + if (sd->connected_subchannel != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF( + sd->connected_subchannel, "connected_subchannel_transient_failure"); + sd->connected_subchannel = nullptr; + } + break; } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - // If the sd's new state is SHUTDOWN, unref the subchannel. - else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "rr_connectivity_shutdown"); - } else { // sd not in SHUTDOWN - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + case GRPC_CHANNEL_READY: { if (sd->connected_subchannel == nullptr) { sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(sd->subchannel), @@ -522,10 +513,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } + break; } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(); + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE:; // fallthrough } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -549,7 +545,7 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (next_ready_index < p->subchannel_list->num_subchannels) { grpc_lb_subchannel_data* selected = &p->subchannel_list->subchannels[next_ready_index]; - grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_core::ConnectedSubchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); target->Ping(on_initiate, on_ack); GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0f8cea9347e..e4db3ef464b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -43,7 +43,7 @@ typedef struct { grpc_lb_subchannel_list* subchannel_list; /** subchannel itself */ grpc_subchannel* subchannel; - grpc_connected_subchannel* connected_subchannel; + grpc_core::ConnectedSubchannel* connected_subchannel; /** Is a connectivity notification pending? */ bool connectivity_notification_pending; /** notification that connectivity has changed on subchannel */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 25615b6326f..2f1662e63bd 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -56,8 +56,8 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_core::ConnectedSubchannel*)(gpr_atm_##barrier##_load( \ &(subchannel)->connected_subchannel))) typedef struct { @@ -106,7 +106,8 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; - /** active connection, or null; of type grpc_connected_subchannel */ + /** active connection, or null; of type grpc_core::ConnectedSubchannel + */ gpr_atm connected_subchannel; /** mutex protecting remaining elements */ @@ -135,7 +136,7 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_connected_subchannel* connection; + grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy; }; @@ -166,14 +167,14 @@ static void connection_destroy(void* arg, grpc_error* error) { gpr_free(stk); } -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_core::ConnectedSubchannel* ConnectedSubchannel_ref( + grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { c->Ref(DEBUG_LOCATION, REF_REASON); return c; } -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void ConnectedSubchannel_unref( + grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { c->Unref(DEBUG_LOCATION, REF_REASON); } @@ -247,7 +248,7 @@ static void disconnect(grpc_subchannel* c) { c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - grpc_connected_subchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + grpc_core::ConnectedSubchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != nullptr) { GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "disconnect"); gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); @@ -535,11 +536,15 @@ static void on_connected_subchannel_connectivity_changed(void* p, auto* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); /* if we failed just leave this closure */ - if (connected_subchannel_watcher->connectivity_state == + if (connected_subchannel_watcher->connectivity_state >= GRPC_CHANNEL_TRANSIENT_FAILURE) { if (!c->disconnected && con != nullptr) { GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "transient_failure"); gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm) nullptr); + gpr_log( + GPR_INFO, + "LOL FORMER Connected subchannel %p of subchannel %p is now NULL.", + con, c); grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "reflect_child"); @@ -547,28 +552,28 @@ static void on_connected_subchannel_connectivity_changed(void* p, c->backoff->Reset(); if (grpc_trace_stream_refcount.enabled()) { gpr_log(GPR_INFO, - "Connected subchannel %p of subchannel %p has gone into " - "TRANSIENT_FAILURE. Attempting to reconnect.", - con, c); + "Connected subchannel %p of subchannel %p has gone into %s. " + "Attempting to reconnect.", + con, c, grpc_connectivity_state_name( + connected_subchannel_watcher->connectivity_state)); } maybe_start_connecting_locked(c); - goto done; } else { connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_SHUTDOWN; } + } else { + grpc_connectivity_state_set( + &c->state_tracker, connected_subchannel_watcher->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); + if (connected_subchannel_watcher->connectivity_state < + GRPC_CHANNEL_TRANSIENT_FAILURE) { + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + con->NotifyOnStateChange( + nullptr, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); + connected_subchannel_watcher = nullptr; + } } - grpc_connectivity_state_set(&c->state_tracker, - connected_subchannel_watcher->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - if (connected_subchannel_watcher->connectivity_state < - GRPC_CHANNEL_TRANSIENT_FAILURE) { - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - con->NotifyOnStateChange(nullptr, - &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); - connected_subchannel_watcher = nullptr; - } -done: gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); gpr_free(connected_subchannel_watcher); @@ -619,8 +624,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { I'd have expected the rel_cas below to be enough, but seemingly it's not. Re-evaluate if we really need this. */ - grpc_connected_subchannel* con = - grpc_core::New(stk); + grpc_core::ConnectedSubchannel* con = + grpc_core::New(stk); gpr_atm_full_barrier(); GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); @@ -677,7 +682,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_connected_subchannel* connection = c->connection; + grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); @@ -711,7 +716,7 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( +grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel( grpc_subchannel* c) { return GET_CONNECTED_SUBCHANNEL(c, acq); } @@ -757,24 +762,16 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } -grpc_connected_subchannel::grpc_connected_subchannel( - grpc_channel_stack* channel_stack) +namespace grpc_core { +ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), channel_stack_(channel_stack) {} -grpc_connected_subchannel* grpc_connected_subchannel::Ref( - const grpc_core::DebugLocation& location, const char* reason) { - GRPC_CHANNEL_STACK_REF(channel_stack_, REF_REASON); - grpc_core::RefCountedWithTracing::Ref(location, reason); - return this; -} -void grpc_connected_subchannel::Unref(const grpc_core::DebugLocation& location, - const char* reason) { - GRPC_CHANNEL_STACK_UNREF(channel_stack_, REF_REASON); - grpc_core::RefCountedWithTracing::Unref(location, reason); +ConnectedSubchannel::~ConnectedSubchannel() { + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } -void grpc_connected_subchannel::NotifyOnStateChange( +void ConnectedSubchannel::NotifyOnStateChange( grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure) { grpc_transport_op* op = grpc_make_transport_op(nullptr); @@ -786,8 +783,8 @@ void grpc_connected_subchannel::NotifyOnStateChange( elem->filter->start_transport_op(elem, op); } -void grpc_connected_subchannel::Ping(grpc_closure* on_initiate, - grpc_closure* on_ack) { +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; op->send_ping.on_initiate = on_initiate; @@ -796,22 +793,23 @@ void grpc_connected_subchannel::Ping(grpc_closure* on_initiate, elem->filter->start_transport_op(elem, op); } -grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args, - grpc_subchannel_call** call) { +grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, + grpc_subchannel_call** call) { *call = (grpc_subchannel_call*)gpr_arena_alloc( - args->arena, + args.arena, sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - (*call)->connection = Ref(DEBUG_LOCATION, "subchannel_call"); + Ref(DEBUG_LOCATION, "subchannel_call"); + (*call)->connection = this; const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args->context, /* context */ - args->path, /* path */ - args->start_time, /* start_time */ - args->deadline, /* deadline */ - args->arena, /* arena */ - args->call_combiner /* call_combiner */ + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ }; grpc_error* error = grpc_call_stack_init( channel_stack_, 1, subchannel_call_destroy, *call, &call_args); @@ -820,6 +818,7 @@ grpc_error* grpc_connected_subchannel::CreateCall(const CallArgs* args, gpr_log(GPR_ERROR, "error: %s", error_string); return error; } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); return GRPC_ERROR_NONE; } +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index d9a850daaea..dbc57427873 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -49,9 +49,9 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ - grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) + ConnectedSubchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r)) + ConnectedSubchannel_unref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ @@ -65,15 +65,15 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p)) +#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) ConnectedSubchannel_ref((p)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) ConnectedSubchannel_unref((p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -class grpc_connected_subchannel : public grpc_core::RefCountedWithTracing { +namespace grpc_core { +class ConnectedSubchannel : public grpc_core::RefCountedWithTracing { public: struct CallArgs { grpc_polling_entity* pollent; @@ -85,21 +85,20 @@ class grpc_connected_subchannel : public grpc_core::RefCountedWithTracing { grpc_call_combiner* call_combiner; }; - grpc_connected_subchannel(grpc_channel_stack* channel_stack); - grpc_connected_subchannel* Ref(const grpc_core::DebugLocation& location, - const char* reason); - void Unref(const grpc_core::DebugLocation& location, const char* reason); + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + ~ConnectedSubchannel(); + grpc_channel_stack* channel_stack() { return channel_stack_; } void NotifyOnStateChange(grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); - - grpc_error* CreateCall(const CallArgs* args, grpc_subchannel_call** call); + grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); private: grpc_channel_stack* channel_stack_; }; +} // namespace grpc_core grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); @@ -111,10 +110,10 @@ grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_core::ConnectedSubchannel* ConnectedSubchannel_ref( + grpc_core::ConnectedSubchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void ConnectedSubchannel_unref( + grpc_core::ConnectedSubchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref( @@ -130,9 +129,9 @@ void grpc_subchannel_notify_on_state_change( grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify); -/** retrieve the grpc_connected_subchannel - or NULL if called before +/** retrieve the grpc_core::ConnectedSubchannel - or NULL if called before the subchannel becomes connected */ -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( +grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel( grpc_subchannel* subchannel); /** return the subchannel index key for \a subchannel */ diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h index f2182baea19..8fdc3458d16 100644 --- a/src/core/lib/support/ref_counted.h +++ b/src/core/lib/support/ref_counted.h @@ -45,6 +45,7 @@ class RefCounted { // Not copyable nor movable. RefCounted(const RefCounted&) = delete; RefCounted& operator=(const RefCounted&) = delete; + GRPC_ABSTRACT_BASE_CLASS protected: // Allow Delete() to access destructor. @@ -112,6 +113,8 @@ class RefCountedWithTracing { gpr_ref_init(&refs_, 1); } + virtual ~RefCountedWithTracing() {} + private: TraceFlag* trace_flag_ = nullptr; gpr_refcount refs_; From dfa2851462809a459dd734090c221ea322064c7d Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 11 Jan 2018 18:31:13 -0800 Subject: [PATCH 3/7] PR comments, round 2 --- .../filters/client_channel/client_channel.cc | 8 +- .../ext/filters/client_channel/lb_policy.h | 5 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 8 +- .../lb_policy/pick_first/pick_first.cc | 27 ++--- .../lb_policy/round_robin/round_robin.cc | 39 +++--- .../lb_policy/subchannel_list.cc | 5 +- .../lb_policy/subchannel_list.h | 3 +- .../ext/filters/client_channel/subchannel.cc | 112 ++++++++---------- .../ext/filters/client_channel/subchannel.h | 21 +--- src/core/lib/debug/trace.h | 5 +- src/core/lib/support/ref_counted.h | 8 +- src/core/lib/support/ref_counted_ptr.h | 2 + 12 files changed, 104 insertions(+), 139 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 36134c6b06e..6a10a25a325 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1031,7 +1031,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - if (calld->pick.connected_subchannel == nullptr) { + if (!calld->pick.connected_subchannel) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); calld->error = error == GRPC_ERROR_NONE @@ -1283,7 +1283,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - GPR_ASSERT(calld->pick.connected_subchannel == nullptr); + GPR_ASSERT(!calld->pick.connected_subchannel); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. if (pick_callback_start_locked(elem)) { @@ -1462,8 +1462,8 @@ static void cc_destroy_call_elem(grpc_call_element* elem, "client_channel_destroy_call"); } GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); - if (calld->pick.connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked"); + if (calld->pick.connected_subchannel) { + calld->pick.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (calld->pick.subchannel_call_context[i].value != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 414cdd15796..0cc0cb59c3e 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -21,6 +21,7 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" /** A load balancing policy: specified by a vtable and a struct (which @@ -54,9 +55,9 @@ typedef struct grpc_lb_policy_pick_state { grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. grpc_closure* on_complete; - /// Will be set to the selected subchannel, or NULL on failure or when + /// Will be set to the selected subchannel, or nullptr on failure or when /// the LB policy decides to drop the call. - grpc_core::ConnectedSubchannel* connected_subchannel; + grpc_core::RefCountedPtr connected_subchannel; /// Will be populated with context to pass to the subchannel call, if needed. grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; /// Upon success, \a *user_data will be set to whatever opaque information diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5849ac9d2da..39467272b36 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -322,7 +322,7 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) { /* if connected_subchannel is nullptr, no pick has been made by the RR * policy (e.g., all addresses failed to connect). There won't be any * user_data/token available */ - if (pp->pick->connected_subchannel != nullptr) { + if (pp->pick->connected_subchannel.get() != nullptr) { if (!GRPC_MDISNULL(pp->lb_token)) { initial_metadata_add_lb_token(pp->pick->initial_metadata, &pp->pick->lb_token_mdelem_storage, @@ -935,7 +935,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } gpr_free(pp); } else { - pp->pick->connected_subchannel = nullptr; + pp->pick->connected_subchannel = + grpc_core::MakeRefCounted(nullptr); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } pp = next; @@ -972,7 +973,8 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { pending_pick* next = pp->next; if (pp->pick == pick) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel = + grpc_core::MakeRefCounted(nullptr); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 58dcbffb2d8..c91a305f336 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -81,7 +81,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol, GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } else { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } } @@ -111,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { grpc_lb_policy_pick_state* next = pp->next; if (pp == pick) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); @@ -176,8 +176,7 @@ static int pf_pick_locked(grpc_lb_policy* pol, pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + pick->connected_subchannel = p->selected->connected_subchannel; return 1; } // No subchannel selected yet, so handle asynchronously. @@ -295,9 +294,8 @@ static void pf_update_locked(grpc_lb_policy* policy, p, p->selected->subchannel, i, subchannel_list->num_subchannels); } - if (p->selected->connected_subchannel != nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "pf_update_includes_selected"); + if (p->selected->connected_subchannel) { + sd->connected_subchannel = p->selected->connected_subchannel; } p->selected = sd; if (p->subchannel_list != nullptr) { @@ -425,7 +423,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { sd->subchannel_list, "pf_selected_shutdown"); grpc_lb_subchannel_data_unref_subchannel( sd, "pf_selected_shutdown"); // Unrefs connected subchannel - } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, @@ -449,15 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. - grpc_core::ConnectedSubchannel* con = + sd->connected_subchannel = grpc_subchannel_get_connected_subchannel(sd->subchannel); - if (con == nullptr) { - // The subchannel may have become disconnected by the time this callback - // is invoked. Simply ignore and resubscribe: ulterior connectivity - // states must be in the pipeline and will eventually be invoked. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - break; - } if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -469,7 +459,7 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); sd->connected_subchannel = - GRPC_CONNECTED_SUBCHANNEL_REF(con, "connected"); + grpc_subchannel_get_connected_subchannel(sd->subchannel); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -481,8 +471,7 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_policy_pick_state* pick; while ((pick = p->pending_picks)) { p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + pick->connected_subchannel = p->selected->connected_subchannel; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 8d3fd637572..91fee2d51aa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -36,6 +36,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -127,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, (void*)p, (unsigned long)last_ready_index, (void*)p->subchannel_list->subchannels[last_ready_index].subchannel, (void*)p->subchannel_list->subchannels[last_ready_index] - .connected_subchannel); + .connected_subchannel.get()); } } @@ -162,7 +163,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol, GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } else { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } } @@ -192,7 +193,7 @@ static void rr_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { grpc_lb_policy_pick_state* next = pp->next; if (pp == pick) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); @@ -216,7 +217,7 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, grpc_lb_policy_pick_state* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); @@ -262,8 +263,7 @@ static int rr_pick_locked(grpc_lb_policy* pol, /* readily available, report right away */ grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[next_ready_index]; - pick->connected_subchannel = - GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); + pick->connected_subchannel = sd->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = sd->user_data; } @@ -272,8 +272,8 @@ static int rr_pick_locked(grpc_lb_policy* pol, GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", - p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list, - next_ready_index); + p, sd->subchannel, pick->connected_subchannel.get(), + sd->subchannel_list, next_ready_index); } /* only advance the last picked pointer if the selection was used */ update_last_ready_subchannel_index_locked(p, next_ready_index); @@ -373,7 +373,6 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); round_robin_lb_policy* p = (round_robin_lb_policy*)sd->subchannel_list->policy; if (grpc_lb_round_robin_trace.enabled()) { @@ -408,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // either the current or latest pending subchannel lists. GPR_ASSERT(sd->subchannel_list == p->subchannel_list || sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -419,18 +419,13 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // subchannel, if any. switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { - if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF( - sd->connected_subchannel, "connected_subchannel_transient_failure"); - sd->connected_subchannel = nullptr; - } + sd->connected_subchannel.reset(); break; } case GRPC_CHANNEL_READY: { - if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); + if (!sd->connected_subchannel) { + sd->connected_subchannel = + grpc_subchannel_get_connected_subchannel(sd->subchannel); } if (sd->subchannel_list != p->subchannel_list) { // promote sd->subchannel_list to p->subchannel_list. @@ -473,8 +468,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_policy_pick_state* pick; while ((pick = p->pending_picks)) { p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - selected->connected_subchannel, "rr_picked"); + pick->connected_subchannel = selected->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = selected->user_data; } @@ -519,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (next_ready_index < p->subchannel_list->num_subchannels) { grpc_lb_subchannel_data* selected = &p->subchannel_list->subchannels[next_ready_index]; - grpc_core::ConnectedSubchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( - selected->connected_subchannel, "rr_ping"); + grpc_core::RefCountedPtr target = + selected->connected_subchannel; target->Ping(on_initiate, on_ack); - GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 5ce1298afc4..fa2ffcc7966 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -42,10 +42,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, } GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); sd->subchannel = nullptr; - if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason); - sd->connected_subchannel = nullptr; - } + sd->connected_subchannel.reset(); if (sd->user_data != nullptr) { GPR_ASSERT(sd->user_data_vtable != nullptr); sd->user_data_vtable->destroy(sd->user_data); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index e4db3ef464b..f146c724e07 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -22,6 +22,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and @@ -43,7 +44,7 @@ typedef struct { grpc_lb_subchannel_list* subchannel_list; /** subchannel itself */ grpc_subchannel* subchannel; - grpc_core::ConnectedSubchannel* connected_subchannel; + grpc_core::RefCountedPtr connected_subchannel; /** Is a connectivity notification pending? */ bool connectivity_notification_pending; /** notification that connectivity has changed on subchannel */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 37c43d78ba4..fe0bb486e87 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -108,13 +108,13 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; - /** active connection, or null; of type grpc_core::ConnectedSubchannel - */ - gpr_atm connected_subchannel; - /** mutex protecting remaining elements */ gpr_mu mu; + /** active connection, or null; of type grpc_core::ConnectedSubchannel + */ + grpc_core::RefCountedPtr connected_subchannel; + /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ @@ -169,17 +169,6 @@ static void connection_destroy(void* arg, grpc_error* error) { gpr_free(stk); } -grpc_core::ConnectedSubchannel* grpc_connected_subchannel_ref( - grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - c->Ref(DEBUG_LOCATION, REF_REASON); - return c; -} - -void grpc_connected_subchannel_unref( - grpc_core::ConnectedSubchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - c->Unref(DEBUG_LOCATION, REF_REASON); -} - /* * grpc_subchannel implementation */ @@ -250,11 +239,7 @@ static void disconnect(grpc_subchannel* c) { c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - grpc_core::ConnectedSubchannel* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "disconnect"); - gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); - } + c->connected_subchannel.reset(); gpr_mu_unlock(&c->mu); } @@ -458,7 +443,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { return; } - if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) { + if (c->connected_subchannel) { /* Already connected: don't restart */ return; } @@ -536,38 +521,37 @@ static void on_connected_subchannel_connectivity_changed(void* p, gpr_mu_lock(mu); - auto* con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - /* if we failed just leave this closure */ - if (connected_subchannel_watcher->connectivity_state >= - GRPC_CHANNEL_TRANSIENT_FAILURE) { - if (!c->disconnected && con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "transient_failure"); - gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm) nullptr); - grpc_connectivity_state_set(&c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "reflect_child"); - c->backoff_begun = false; - c->backoff->Reset(); - if (grpc_trace_stream_refcount.enabled()) { - gpr_log(GPR_INFO, - "Connected subchannel %p of subchannel %p has gone into %s. " - "Attempting to reconnect.", - con, c, - grpc_connectivity_state_name( - connected_subchannel_watcher->connectivity_state)); + switch (connected_subchannel_watcher->connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: { + if (!c->disconnected && c->connected_subchannel) { + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into %s. " + "Attempting to reconnect.", + c->connected_subchannel.get(), c, + grpc_connectivity_state_name( + connected_subchannel_watcher->connectivity_state)); + } + c->connected_subchannel.reset(); + grpc_connectivity_state_set(&c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "reflect_child"); + c->backoff_begun = false; + c->backoff->Reset(); + maybe_start_connecting_locked(c); + } else { + connected_subchannel_watcher->connectivity_state = + GRPC_CHANNEL_SHUTDOWN; } - maybe_start_connecting_locked(c); - } else { - connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_SHUTDOWN; + break; } - } else { - grpc_connectivity_state_set( - &c->state_tracker, connected_subchannel_watcher->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - if (connected_subchannel_watcher->connectivity_state < - GRPC_CHANNEL_TRANSIENT_FAILURE) { + default: { + grpc_connectivity_state_set( + &c->state_tracker, connected_subchannel_watcher->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - con->NotifyOnStateChange( + c->connected_subchannel->NotifyOnStateChange( nullptr, &connected_subchannel_watcher->connectivity_state, &connected_subchannel_watcher->closure); connected_subchannel_watcher = nullptr; @@ -619,22 +603,19 @@ static bool publish_transport_locked(grpc_subchannel* c) { } /* publish */ - /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. - I'd have expected the rel_cas below to be enough, but - seemingly it's not. - Re-evaluate if we really need this. */ - grpc_core::ConnectedSubchannel* con = - grpc_core::New(stk); + c->connected_subchannel.reset( + grpc_core::New(stk)); gpr_atm_full_barrier(); - GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); + gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", + c->connected_subchannel.get(), c); /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - con->NotifyOnStateChange(c->pollset_set, - &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); + c->connected_subchannel->NotifyOnStateChange( + c->pollset_set, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); /* signal completion */ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, @@ -684,7 +665,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); - GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); + connection->Unref(DEBUG_LOCATION, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -715,9 +696,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* c) { - return GET_CONNECTED_SUBCHANNEL(c, acq); +grpc_core::RefCountedPtr +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { + gpr_mu_lock(&c->mu); + auto copy = c->connected_subchannel; + gpr_mu_unlock(&c->mu); + return copy; } const grpc_subchannel_key* grpc_subchannel_get_key( diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 61fa97a21aa..3bcc5c2432e 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -24,6 +24,7 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/support/arena.h" #include "src/core/lib/support/ref_counted.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -48,10 +49,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ - grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ @@ -65,9 +62,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS @@ -111,10 +105,6 @@ grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_core::ConnectedSubchannel* grpc_connected_subchannel_ref( - grpc_core::ConnectedSubchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_unref( - grpc_core::ConnectedSubchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref( @@ -130,10 +120,11 @@ void grpc_subchannel_notify_on_state_change( grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify); -/** retrieve the grpc_core::ConnectedSubchannel - or NULL if called before - the subchannel becomes connected */ -grpc_core::ConnectedSubchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* subchannel); +/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected + * (which may happen before it initially connects or during transient failures) + * */ +grpc_core::RefCountedPtr +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); /** return the subchannel index key for \a subchannel */ const grpc_subchannel_key* grpc_subchannel_get_key( diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h index 6ee1b49f747..69ddd802222 100644 --- a/src/core/lib/debug/trace.h +++ b/src/core/lib/debug/trace.h @@ -88,10 +88,9 @@ class TraceFlag { #ifndef NDEBUG typedef TraceFlag DebugOnlyTraceFlag; #else -class DebugOnlyTraceFlag : public TraceFlag { +class DebugOnlyTraceFlag { public: - DebugOnlyTraceFlag(bool default_enabled, const char* name) - : TraceFlag(default_enabled, name) {} + DebugOnlyTraceFlag(bool default_enabled, const char* name) {} bool enabled() { return false; } private: diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h index 8fdc3458d16..eaa293a86c1 100644 --- a/src/core/lib/support/ref_counted.h +++ b/src/core/lib/support/ref_counted.h @@ -106,13 +106,19 @@ class RefCountedWithTracing { template friend void Delete(T*); - RefCountedWithTracing() : RefCountedWithTracing(nullptr) {} + RefCountedWithTracing() + : RefCountedWithTracing(static_cast(nullptr)) {} explicit RefCountedWithTracing(TraceFlag* trace_flag) : trace_flag_(trace_flag) { gpr_ref_init(&refs_, 1); } +#ifdef NDEBUG + explicit RefCountedWithTracing(DebugOnlyTraceFlag* trace_flag) + : RefCountedWithTracing() {} +#endif + virtual ~RefCountedWithTracing() {} private: diff --git a/src/core/lib/support/ref_counted_ptr.h b/src/core/lib/support/ref_counted_ptr.h index dc2385e369c..dad213003d6 100644 --- a/src/core/lib/support/ref_counted_ptr.h +++ b/src/core/lib/support/ref_counted_ptr.h @@ -76,6 +76,8 @@ class RefCountedPtr { T& operator*() const { return *value_; } T* operator->() const { return value_; } + explicit operator bool() const { return get() != nullptr; } + private: T* value_ = nullptr; }; From be1b7f986e55756ac83b3807ee1d73929fbb3546 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 12 Jan 2018 14:01:38 -0800 Subject: [PATCH 4/7] PR comments, round 3 --- src/core/ext/filters/client_channel/client_channel.cc | 6 +++--- .../filters/client_channel/lb_policy/grpclb/grpclb.cc | 8 +++----- .../client_channel/lb_policy/pick_first/pick_first.cc | 4 +--- .../client_channel/lb_policy/round_robin/round_robin.cc | 2 +- src/core/ext/filters/client_channel/subchannel.cc | 9 ++------- src/core/lib/support/ref_counted_ptr.h | 9 ++++++++- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 6a10a25a325..bb29f65af97 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1031,7 +1031,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - if (!calld->pick.connected_subchannel) { + if (calld->pick.connected_subchannel == nullptr) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); calld->error = error == GRPC_ERROR_NONE @@ -1283,7 +1283,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - GPR_ASSERT(!calld->pick.connected_subchannel); + GPR_ASSERT(calld->pick.connected_subchannel == nullptr); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. if (pick_callback_start_locked(elem)) { @@ -1462,7 +1462,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, "client_channel_destroy_call"); } GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); - if (calld->pick.connected_subchannel) { + if (calld->pick.connected_subchannel != nullptr) { calld->pick.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 39467272b36..9da21c73924 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -322,7 +322,7 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) { /* if connected_subchannel is nullptr, no pick has been made by the RR * policy (e.g., all addresses failed to connect). There won't be any * user_data/token available */ - if (pp->pick->connected_subchannel.get() != nullptr) { + if (pp->pick->connected_subchannel != nullptr) { if (!GRPC_MDISNULL(pp->lb_token)) { initial_metadata_add_lb_token(pp->pick->initial_metadata, &pp->pick->lb_token_mdelem_storage, @@ -935,8 +935,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } gpr_free(pp); } else { - pp->pick->connected_subchannel = - grpc_core::MakeRefCounted(nullptr); + pp->pick->connected_subchannel.reset(nullptr); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } pp = next; @@ -973,8 +972,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { pending_pick* next = pp->next; if (pp->pick == pick) { - pick->connected_subchannel = - grpc_core::MakeRefCounted(nullptr); + pick->connected_subchannel.reset(nullptr); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index c91a305f336..725b78d4787 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -294,7 +294,7 @@ static void pf_update_locked(grpc_lb_policy* policy, p, p->selected->subchannel, i, subchannel_list->num_subchannels); } - if (p->selected->connected_subchannel) { + if (p->selected->connected_subchannel != nullptr) { sd->connected_subchannel = p->selected->connected_subchannel; } p->selected = sd; @@ -458,8 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 91fee2d51aa..dca345566ad 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -423,7 +423,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { break; } case GRPC_CHANNEL_READY: { - if (!sd->connected_subchannel) { + if (sd->connected_subchannel == nullptr) { sd->connected_subchannel = grpc_subchannel_get_connected_subchannel(sd->subchannel); } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index fe0bb486e87..5b8a14b9e7b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -56,10 +56,6 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_core::ConnectedSubchannel*)(gpr_atm_##barrier##_load( \ - &(subchannel)->connected_subchannel))) - namespace { struct state_watcher { grpc_closure closure; @@ -443,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { return; } - if (c->connected_subchannel) { + if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } @@ -524,7 +520,7 @@ static void on_connected_subchannel_connectivity_changed(void* p, switch (connected_subchannel_watcher->connectivity_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected && c->connected_subchannel) { + if (!c->disconnected && c->connected_subchannel != nullptr) { if (grpc_trace_stream_refcount.enabled()) { gpr_log(GPR_INFO, "Connected subchannel %p of subchannel %p has gone into %s. " @@ -605,7 +601,6 @@ static bool publish_transport_locked(grpc_subchannel* c) { /* publish */ c->connected_subchannel.reset( grpc_core::New(stk)); - gpr_atm_full_barrier(); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); diff --git a/src/core/lib/support/ref_counted_ptr.h b/src/core/lib/support/ref_counted_ptr.h index dad213003d6..8c8606ca0a1 100644 --- a/src/core/lib/support/ref_counted_ptr.h +++ b/src/core/lib/support/ref_counted_ptr.h @@ -76,7 +76,14 @@ class RefCountedPtr { T& operator*() const { return *value_; } T* operator->() const { return value_; } - explicit operator bool() const { return get() != nullptr; } + bool operator==(const RefCountedPtr& other) const { + return value_ == other.value_; + } + bool operator==(T* other) const { return value_ == other; } + bool operator!=(const RefCountedPtr& other) const { + return value_ != other.value_; + } + bool operator!=(T* other) const { return value_ != other; } private: T* value_ = nullptr; From 269ee29e032e3edc62d5b63ce44b5135979249da Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 12 Jan 2018 14:18:37 -0800 Subject: [PATCH 5/7] PR comments, round 4 --- .../ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 9da21c73924..d71cb2fefa7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -935,7 +935,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } gpr_free(pp); } else { - pp->pick->connected_subchannel.reset(nullptr); + pp->pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } pp = next; @@ -972,7 +972,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { pending_pick* next = pp->next; if (pp->pick == pick) { - pick->connected_subchannel.reset(nullptr); + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); From 156d6e15bb97c803e9b6cc20f13578df6752a398 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 17 Jan 2018 12:25:27 -0800 Subject: [PATCH 6/7] Fix bad merge --- src/core/lib/support/ref_counted.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h index 1adf867d65f..48c11f7bbfe 100644 --- a/src/core/lib/support/ref_counted.h +++ b/src/core/lib/support/ref_counted.h @@ -45,7 +45,6 @@ class RefCounted { // Not copyable nor movable. RefCounted(const RefCounted&) = delete; RefCounted& operator=(const RefCounted&) = delete; - GRPC_ABSTRACT_BASE_CLASS GRPC_ABSTRACT_BASE_CLASS @@ -101,7 +100,6 @@ class RefCountedWithTracing { // Not copyable nor movable. RefCountedWithTracing(const RefCountedWithTracing&) = delete; RefCountedWithTracing& operator=(const RefCountedWithTracing&) = delete; - GRPC_ABSTRACT_BASE_CLASS GRPC_ABSTRACT_BASE_CLASS From 4adad9997164a901cd2e5d1b7a636d940a86d581 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 17 Jan 2018 13:50:39 -0800 Subject: [PATCH 7/7] fix BUILD --- BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/BUILD b/BUILD index 5e48a2106d0..186d664395a 100644 --- a/BUILD +++ b/BUILD @@ -940,6 +940,7 @@ grpc_cc_library( "grpc_base", "grpc_deadline_filter", "ref_counted", + "ref_counted_ptr", ], )