From ebe5fbfbe8ff07102a7fd42e90441867a5beb8f0 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Thu, 25 Jan 2018 23:50:46 +0100 Subject: [PATCH] Revert "Merge pull request #13932 from dgquintas/conn_subchannel" This reverts commit a8891634d32ad9556921faed2707fb304304c900, reversing changes made to 47fe8507a1905c20a86df09f97c3f972d643dda5. --- .../filters/client_channel/client_channel.cc | 8 +- .../ext/filters/client_channel/lb_policy.h | 7 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 4 +- .../lb_policy/pick_first/pick_first.cc | 75 +++-- .../lb_policy/round_robin/round_robin.cc | 63 ++-- .../lb_policy/subchannel_list.cc | 5 +- .../lb_policy/subchannel_list.h | 3 +- .../ext/filters/client_channel/subchannel.cc | 296 +++++++++--------- .../ext/filters/client_channel/subchannel.h | 77 +++-- test/cpp/end2end/client_lb_end2end_test.cc | 36 --- 10 files changed, 295 insertions(+), 279 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a8a7a37be01..428945d5ad1 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1003,7 +1003,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - const grpc_core::ConnectedSubchannel::CallArgs call_args = { + const grpc_connected_subchannel_call_args call_args = { calld->pollent, // pollent calld->path, // path calld->call_start_time, // start_time @@ -1012,8 +1012,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem, calld->pick.subchannel_call_context, // context calld->call_combiner // call_combiner }; - grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( - call_args, &calld->subchannel_call); + grpc_error* new_error = grpc_connected_subchannel_create_call( + calld->pick.connected_subchannel, &call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -1463,7 +1463,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, } GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->pick.connected_subchannel != nullptr) { - calld->pick.connected_subchannel.reset(); + GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked"); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (calld->pick.subchannel_call_context[i].value != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index e19726efb3b..ea70de068e3 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -55,9 +55,9 @@ typedef struct grpc_lb_policy_pick_state { grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. grpc_closure* on_complete; - /// Will be set to the selected subchannel, or nullptr on failure or when + /// Will be set to the selected subchannel, or NULL on failure or when /// the LB policy decides to drop the call. - grpc_core::RefCountedPtr connected_subchannel; + grpc_connected_subchannel* connected_subchannel; /// Will be populated with context to pass to the subchannel call, if needed. grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; /// Upon success, \a *user_data will be set to whatever opaque information @@ -153,8 +153,7 @@ void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick); -/** Perform a connected subchannel ping (see \a - grpc_core::ConnectedSubchannel::Ping) +/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, grpc_closure* on_initiate, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 6c29cd82180..629f8ac58ee 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -939,7 +939,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } gpr_free(pp); } else { - pp->pick->connected_subchannel.reset(); + pp->pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } pp = next; @@ -976,7 +976,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { pending_pick* next = pp->next; if (pp->pick == pick) { - pick->connected_subchannel.reset(); + pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 725b78d4787..60385272cf5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -81,7 +81,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol, GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } else { - pick->connected_subchannel.reset(); + pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } } @@ -111,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { grpc_lb_policy_pick_state* next = pp->next; if (pp == pick) { - pick->connected_subchannel.reset(); + pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); @@ -176,7 +176,8 @@ static int pf_pick_locked(grpc_lb_policy* pol, pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - pick->connected_subchannel = p->selected->connected_subchannel; + pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); return 1; } // No subchannel selected yet, so handle asynchronously. @@ -216,7 +217,8 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - p->selected->connected_subchannel->Ping(on_initiate, on_ack); + grpc_connected_subchannel_ping(p->selected->connected_subchannel, + on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -295,7 +297,8 @@ static void pf_update_locked(grpc_lb_policy* policy, subchannel_list->num_subchannels); } if (p->selected->connected_subchannel != nullptr) { - sd->connected_subchannel = p->selected->connected_subchannel; + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "pf_update_includes_selected"); } p->selected = sd; if (p->subchannel_list != nullptr) { @@ -407,8 +410,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || + sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -416,19 +419,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); - // in transient failure. Rely on re-resolution to recover. - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel( - sd, "pf_selected_shutdown"); // Unrefs connected subchannel } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); + } + if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); + } else { + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); } } return; @@ -446,8 +450,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -458,6 +460,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -469,7 +474,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_policy_pick_state* pick; while ((pick = p->pending_picks)) { p->pending_picks = pick->next; - pick->connected_subchannel = p->selected->connected_subchannel; + pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -514,8 +520,39 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } - case GRPC_CHANNEL_SHUTDOWN: - GPR_UNREACHABLE_CODE(break); + case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data* original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == nullptr && sd != original_sd); + if (sd == original_sd) { + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_exhausted_subchannels"); + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, + "exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_NONE); + } + } else { + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "subchannel_failed"); + } + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(sd); + } + } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e217a0b0c00..c4e98206e95 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -128,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, (void*)p, (unsigned long)last_ready_index, (void*)p->subchannel_list->subchannels[last_ready_index].subchannel, (void*)p->subchannel_list->subchannels[last_ready_index] - .connected_subchannel.get()); + .connected_subchannel); } } @@ -163,7 +163,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol, GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } else { - pick->connected_subchannel.reset(); + pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } } @@ -193,7 +193,7 @@ static void rr_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { grpc_lb_policy_pick_state* next = pp->next; if (pp == pick) { - pick->connected_subchannel.reset(); + pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); @@ -217,7 +217,7 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, grpc_lb_policy_pick_state* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - pick->connected_subchannel.reset(); + pick->connected_subchannel = nullptr; GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); @@ -263,7 +263,8 @@ static int rr_pick_locked(grpc_lb_policy* pol, /* readily available, report right away */ grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[next_ready_index]; - pick->connected_subchannel = sd->connected_subchannel; + pick->connected_subchannel = + GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); if (pick->user_data != nullptr) { *pick->user_data = sd->user_data; } @@ -272,8 +273,8 @@ static int rr_pick_locked(grpc_lb_policy* pol, GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", - p, sd->subchannel, pick->connected_subchannel.get(), - sd->subchannel_list, next_ready_index); + p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list, + next_ready_index); } /* only advance the last picked pointer if the selection was used */ update_last_ready_subchannel_index_locked(p, next_ready_index); @@ -291,14 +292,15 @@ static int rr_pick_locked(grpc_lb_policy* pol, static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(subchannel_list->num_transient_failures > 0); --subchannel_list->num_transient_failures; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + GPR_ASSERT(subchannel_list->num_shutdown > 0); + --subchannel_list->num_shutdown; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; @@ -308,6 +310,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++subchannel_list->num_transient_failures; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + ++subchannel_list->num_shutdown; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { ++subchannel_list->num_idle; } @@ -407,7 +411,6 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // either the current or latest pending subchannel lists. GPR_ASSERT(sd->subchannel_list == p->subchannel_list || sd->subchannel_list == p->latest_pending_subchannel_list); - GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -415,17 +418,18 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // Update state counters and new overall state. update_state_counters_locked(sd); update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); - // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* - // subchannel, if any. - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - sd->connected_subchannel.reset(); - break; - } - case GRPC_CHANNEL_READY: { + // If the sd's new state is SHUTDOWN, unref the subchannel. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "rr_connectivity_shutdown"); + } else { // sd not in SHUTDOWN + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); } if (sd->subchannel_list != p->subchannel_list) { // promote sd->subchannel_list to p->subchannel_list. @@ -468,7 +472,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_policy_pick_state* pick; while ((pick = p->pending_picks)) { p->pending_picks = pick->next; - pick->connected_subchannel = selected->connected_subchannel; + pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + selected->connected_subchannel, "rr_picked"); if (pick->user_data != nullptr) { *pick->user_data = selected->user_data; } @@ -481,15 +486,10 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } - break; } - case GRPC_CHANNEL_SHUTDOWN: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE:; // fallthrough + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -513,9 +513,10 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (next_ready_index < p->subchannel_list->num_subchannels) { grpc_lb_subchannel_data* selected = &p->subchannel_list->subchannels[next_ready_index]; - grpc_core::RefCountedPtr target = - selected->connected_subchannel; - target->Ping(on_initiate, on_ack); + grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( + selected->connected_subchannel, "rr_ping"); + grpc_connected_subchannel_ping(target, on_initiate, on_ack); + GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index fa2ffcc7966..5ce1298afc4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -42,7 +42,10 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, } GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); sd->subchannel = nullptr; - sd->connected_subchannel.reset(); + if (sd->connected_subchannel != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason); + sd->connected_subchannel = nullptr; + } if (sd->user_data != nullptr) { GPR_ASSERT(sd->user_data_vtable != nullptr); sd->user_data_vtable->destroy(sd->user_data); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index f4e345def6b..0f8cea9347e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -22,7 +22,6 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and @@ -44,7 +43,7 @@ typedef struct { grpc_lb_subchannel_list* subchannel_list; /** subchannel itself */ grpc_subchannel* subchannel; - grpc_core::RefCountedPtr connected_subchannel; + grpc_connected_subchannel* connected_subchannel; /** Is a connectivity notification pending? */ bool connectivity_notification_pending; /** notification that connectivity has changed on subchannel */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index bb43651d0c3..83aaa09445b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -56,6 +56,10 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ + &(subchannel)->connected_subchannel))) + namespace { struct state_watcher { grpc_closure closure; @@ -95,7 +99,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_closure on_connected; + grpc_closure connected; /** callback for our alarm */ grpc_closure on_alarm; @@ -104,13 +108,12 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; + /** active connection, or null; of type grpc_connected_subchannel */ + gpr_atm connected_subchannel; + /** mutex protecting remaining elements */ gpr_mu mu; - /** active connection, or null; of type grpc_core::ConnectedSubchannel - */ - grpc_core::RefCountedPtr connected_subchannel; - /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ @@ -134,15 +137,16 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_core::ConnectedSubchannel* connection; + grpc_connected_subchannel* connection; grpc_closure* schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) +#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call*)(callstack)) - 1) -static void on_subchannel_connected(void* subchannel, grpc_error* error); +static void subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason @@ -160,9 +164,20 @@ static void on_subchannel_connected(void* subchannel, grpc_error* error); */ static void connection_destroy(void* arg, grpc_error* error) { - grpc_channel_stack* stk = (grpc_channel_stack*)arg; - grpc_channel_stack_destroy(stk); - gpr_free(stk); + grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); + gpr_free(c); +} + +grpc_connected_subchannel* grpc_connected_subchannel_ref( + grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + return c; +} + +void grpc_connected_subchannel_unref( + grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } /* @@ -229,13 +244,18 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref( } static void disconnect(grpc_subchannel* c) { + grpc_connected_subchannel* con; grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - c->connected_subchannel.reset(); + con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); + if (con != nullptr) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection"); + gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); + } gpr_mu_unlock(&c->mu); } @@ -355,7 +375,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, + GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -379,7 +399,7 @@ static void continue_connect_locked(grpc_subchannel* c) { grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->on_connected); + &c->connected); } grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, @@ -439,7 +459,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { return; } - if (c->connected_subchannel != nullptr) { + if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) { /* Already connected: don't restart */ return; } @@ -461,10 +481,9 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { const grpc_millis time_til_next = c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); + gpr_log(GPR_INFO, "Retry immediately"); } else { - gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c, - time_til_next); + gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); @@ -508,56 +527,75 @@ void grpc_subchannel_notify_on_state_change( } } -static void on_connected_subchannel_connectivity_changed(void* p, - grpc_error* error) { - state_watcher* connected_subchannel_watcher = (state_watcher*)p; - grpc_subchannel* c = connected_subchannel_watcher->subchannel; +void grpc_connected_subchannel_process_transport_op( + grpc_connected_subchannel* con, grpc_transport_op* op) { + grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); + top_elem->filter->start_transport_op(top_elem, op); +} + +static void subchannel_on_child_state_changed(void* p, grpc_error* error) { + state_watcher* sw = (state_watcher*)p; + grpc_subchannel* c = sw->subchannel; gpr_mu* mu = &c->mu; gpr_mu_lock(mu); - switch (connected_subchannel_watcher->connectivity_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: - case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected && c->connected_subchannel != nullptr) { - if (grpc_trace_stream_refcount.enabled()) { - gpr_log(GPR_INFO, - "Connected subchannel %p of subchannel %p has gone into %s. " - "Attempting to reconnect.", - c->connected_subchannel.get(), c, - grpc_connectivity_state_name( - connected_subchannel_watcher->connectivity_state)); - } - c->connected_subchannel.reset(); - grpc_connectivity_state_set(&c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "reflect_child"); - c->backoff_begun = false; - c->backoff->Reset(); - maybe_start_connecting_locked(c); - } else { - connected_subchannel_watcher->connectivity_state = - GRPC_CHANNEL_SHUTDOWN; - } - break; - } - default: { - grpc_connectivity_state_set( - &c->state_tracker, connected_subchannel_watcher->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - c->connected_subchannel->NotifyOnStateChange( - nullptr, &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); - connected_subchannel_watcher = nullptr; - } + /* if we failed just leave this closure */ + if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* any errors on a subchannel ==> we're done, create a new one */ + sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; + } + grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); + if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_connected_subchannel_notify_on_state_change( + GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, + &sw->connectivity_state, &sw->closure); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + sw = nullptr; } + gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); - gpr_free(connected_subchannel_watcher); + gpr_free(sw); +} + +static void connected_subchannel_state_op(grpc_connected_subchannel* con, + grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); + elem->filter->start_transport_op(elem, op); +} + +void grpc_connected_subchannel_notify_on_state_change( + grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* closure) { + connected_subchannel_state_op(con, interested_parties, state, closure); +} + +void grpc_connected_subchannel_ping(grpc_connected_subchannel* con, + grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); + elem->filter->start_transport_op(elem, op); } static bool publish_transport_locked(grpc_subchannel* c) { + grpc_connected_subchannel* con; + grpc_channel_stack* stk; + state_watcher* sw_subchannel; + /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( @@ -569,9 +607,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { grpc_channel_stack_builder_destroy(builder); return false; } - grpc_channel_stack* stk; grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, connection_destroy, nullptr, (void**)&stk); + builder, 0, 1, connection_destroy, nullptr, (void**)&con); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", @@ -579,37 +616,38 @@ static bool publish_transport_locked(grpc_subchannel* c) { GRPC_ERROR_UNREF(error); return false; } + stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - state_watcher* connected_subchannel_watcher = - (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher)); - connected_subchannel_watcher->subchannel = c; - connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; - GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, - on_connected_subchannel_connectivity_changed, - connected_subchannel_watcher, grpc_schedule_on_exec_ctx); + sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel)); + sw_subchannel->subchannel = c; + sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; + GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, + sw_subchannel, grpc_schedule_on_exec_ctx); if (c->disconnected) { - gpr_free(connected_subchannel_watcher); + gpr_free(sw_subchannel); grpc_channel_stack_destroy(stk); - gpr_free(stk); + gpr_free(con); return false; } /* publish */ - c->connected_subchannel.reset( - grpc_core::New(stk)); - gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", - c->connected_subchannel.get(), c); + /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. + I'd have expected the rel_cas below to be enough, but + seemingly it's not. + Re-evaluate if we really need this. */ + gpr_atm_full_barrier(); + GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - c->connected_subchannel->NotifyOnStateChange( - c->pollset_set, &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); + grpc_connected_subchannel_notify_on_state_change( + con, c->pollset_set, &sw_subchannel->connectivity_state, + &sw_subchannel->closure); /* signal completion */ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, @@ -617,11 +655,11 @@ static bool publish_transport_locked(grpc_subchannel* c) { return true; } -static void on_subchannel_connected(void* arg, grpc_error* error) { +static void subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); + GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && @@ -656,10 +694,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_core::ConnectedSubchannel* connection = c->connection; + grpc_connected_subchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); - connection->Unref(DEBUG_LOCATION, "subchannel_call"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -690,12 +728,9 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_core::RefCountedPtr -grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { - gpr_mu_lock(&c->mu); - auto copy = c->connected_subchannel; - gpr_mu_unlock(&c->mu); - return copy; +grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( + grpc_subchannel* c) { + return GET_CONNECTED_SUBCHANNEL(c, acq); } const grpc_subchannel_key* grpc_subchannel_get_key( @@ -703,6 +738,36 @@ const grpc_subchannel_key* grpc_subchannel_get_key( return subchannel->key; } +grpc_error* grpc_connected_subchannel_create_call( + grpc_connected_subchannel* con, + const grpc_connected_subchannel_call_args* args, + grpc_subchannel_call** call) { + grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); + *call = (grpc_subchannel_call*)gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args->context, /* context */ + args->path, /* path */ + args->start_time, /* start_time */ + args->deadline, /* deadline */ + args->arena, /* arena */ + args->call_combiner /* call_combiner */ + }; + grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy, + *call, &call_args); + if (error != GRPC_ERROR_NONE) { + const char* error_string = grpc_error_string(error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return error; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); + return GRPC_ERROR_NONE; +} + grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); @@ -738,64 +803,3 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } - -namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) - : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} - -ConnectedSubchannel::~ConnectedSubchannel() { - GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); -} - -void ConnectedSubchannel::NotifyOnStateChange( - grpc_pollset_set* interested_parties, grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); -} - -void ConnectedSubchannel::Ping(grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); -} - -grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, - grpc_subchannel_call** call) { - *call = (grpc_subchannel_call*)gpr_arena_alloc( - args.arena, - sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - Ref(DEBUG_LOCATION, "subchannel_call"); - (*call)->connection = this; - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args.context, /* context */ - args.path, /* path */ - args.start_time, /* start_time */ - args.deadline, /* deadline */ - args.arena, /* arena */ - args.call_combiner /* call_combiner */ - }; - grpc_error* error = grpc_call_stack_init( - channel_stack_, 1, subchannel_call_destroy, *call, &call_args); - if (error != GRPC_ERROR_NONE) { - const char* error_string = grpc_error_string(error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return error; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); - return GRPC_ERROR_NONE; -} -} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index f2a5c1e2732..65d78252beb 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -34,6 +34,7 @@ /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; +typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_key grpc_subchannel_key; @@ -49,6 +50,10 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ + grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ + grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ @@ -62,39 +67,14 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) +#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ + grpc_connected_subchannel_unref((p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -namespace grpc_core { -class ConnectedSubchannel : public grpc_core::RefCountedWithTracing { - public: - struct CallArgs { - grpc_polling_entity* pollent; - grpc_slice path; - gpr_timespec start_time; - grpc_millis deadline; - gpr_arena* arena; - grpc_call_context_element* context; - grpc_call_combiner* call_combiner; - }; - - explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); - ~ConnectedSubchannel(); - - grpc_channel_stack* channel_stack() { return channel_stack_; } - void NotifyOnStateChange(grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* closure); - void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); - grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); - - private: - grpc_channel_stack* channel_stack_; -}; -} // namespace grpc_core - grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* grpc_subchannel_ref_from_weak_ref( @@ -105,11 +85,35 @@ grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_connected_subchannel* grpc_connected_subchannel_ref( + grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_connected_subchannel_unref( + grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +/** construct a subchannel call */ +typedef struct { + grpc_polling_entity* pollent; + grpc_slice path; + gpr_timespec start_time; + grpc_millis deadline; + gpr_arena* arena; + grpc_call_context_element* context; + grpc_call_combiner* call_combiner; +} grpc_connected_subchannel_call_args; + +grpc_error* grpc_connected_subchannel_create_call( + grpc_connected_subchannel* connected_subchannel, + const grpc_connected_subchannel_call_args* args, + grpc_subchannel_call** subchannel_call); + +/** process a transport level op */ +void grpc_connected_subchannel_process_transport_op( + grpc_connected_subchannel* subchannel, grpc_transport_op* op); + /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel* channel, grpc_error** error); @@ -119,12 +123,17 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( void grpc_subchannel_notify_on_state_change( grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify); - -/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected - * (which may happen before it initially connects or during transient failures) - * */ -grpc_core::RefCountedPtr -grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); +void grpc_connected_subchannel_notify_on_state_change( + grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* notify); +void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel, + grpc_closure* on_initiate, + grpc_closure* on_ack); + +/** retrieve the grpc_connected_subchannel - or NULL if called before + the subchannel becomes connected */ +grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( + grpc_subchannel* subchannel); /** return the subchannel index key for \a subchannel */ const grpc_subchannel_key* grpc_subchannel_get_key( diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 328ad860615..7f82330ebe0 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -676,42 +676,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { GPR_ASSERT(gpr_time_cmp(deadline, now) > 0); } -TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { - const int kNumServers = 3; - StartServers(kNumServers); - const auto ports = GetServersPorts(); - ResetStub(ports, "round_robin"); - SetNextResolution(ports); - for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i); - for (size_t i = 0; i < servers_.size(); ++i) { - CheckRpcSendOk(); - EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; - } - // One request should have gone to each server. - for (size_t i = 0; i < servers_.size(); ++i) { - EXPECT_EQ(1, servers_[i]->service_.request_count()); - } - const auto pre_death = servers_[0]->service_.request_count(); - // Kill the first server. - servers_[0]->Shutdown(true); - // Client request still succeed. May need retrying if RR had returned a pick - // before noticing the change in the server's connectivity. - while (!SendRpc()) - ; // Retry until success. - // Send a bunch of RPCs that should succeed. - for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk(); - const auto post_death = servers_[0]->service_.request_count(); - // No requests have gone to the deceased server. - EXPECT_EQ(pre_death, post_death); - // Bring the first server back up. - servers_[0].reset(new ServerData(server_host_, ports[0])); - // Requests should start arriving at the first server either right away (if - // the server managed to start before the RR policy retried the subchannel) or - // after the subchannel retry delay otherwise (RR's subchannel retried before - // the server was fully back up). - WaitForServer(0); -} - } // namespace } // namespace testing } // namespace grpc