Merge pull request #14185 from nicolasnoble/revert-13932

Revert "Merge pull request #13932 from dgquintas/conn_subchannel"
pull/14199/head
David G. Quintas 7 years ago committed by GitHub
commit 357dc87d2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/ext/filters/client_channel/lb_policy.h
  3. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 75
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  5. 63
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
  7. 3
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  8. 296
      src/core/ext/filters/client_channel/subchannel.cc
  9. 77
      src/core/ext/filters/client_channel/subchannel.h
  10. 36
      test/cpp/end2end/client_lb_end2end_test.cc

@ -1003,7 +1003,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
grpc_error* error) { grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data; channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data; call_data* calld = (call_data*)elem->call_data;
const grpc_core::ConnectedSubchannel::CallArgs call_args = { const grpc_connected_subchannel_call_args call_args = {
calld->pollent, // pollent calld->pollent, // pollent
calld->path, // path calld->path, // path
calld->call_start_time, // start_time calld->call_start_time, // start_time
@ -1012,8 +1012,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
calld->pick.subchannel_call_context, // context calld->pick.subchannel_call_context, // context
calld->call_combiner // call_combiner calld->call_combiner // call_combiner
}; };
grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( grpc_error* new_error = grpc_connected_subchannel_create_call(
call_args, &calld->subchannel_call); calld->pick.connected_subchannel, &call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error)); chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@ -1463,7 +1463,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
} }
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->pick.connected_subchannel != nullptr) { if (calld->pick.connected_subchannel != nullptr) {
calld->pick.connected_subchannel.reset(); GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked");
} }
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (calld->pick.subchannel_call_context[i].value != nullptr) { if (calld->pick.subchannel_call_context[i].value != nullptr) {

@ -55,9 +55,9 @@ typedef struct grpc_lb_policy_pick_state {
grpc_linked_mdelem lb_token_mdelem_storage; grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously. /// Closure to run when pick is complete, if not completed synchronously.
grpc_closure* on_complete; grpc_closure* on_complete;
/// Will be set to the selected subchannel, or nullptr on failure or when /// Will be set to the selected subchannel, or NULL on failure or when
/// the LB policy decides to drop the call. /// the LB policy decides to drop the call.
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; grpc_connected_subchannel* connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if needed. /// Will be populated with context to pass to the subchannel call, if needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
/// Upon success, \a *user_data will be set to whatever opaque information /// Upon success, \a *user_data will be set to whatever opaque information
@ -153,8 +153,7 @@ void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick); grpc_lb_policy_pick_state* pick);
/** Perform a connected subchannel ping (see \a /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
grpc_core::ConnectedSubchannel::Ping)
against one of the connected subchannels managed by \a policy. */ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate, grpc_closure* on_initiate,

@ -939,7 +939,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
} }
gpr_free(pp); gpr_free(pp);
} else { } else {
pp->pick->connected_subchannel.reset(); pp->pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
} }
pp = next; pp = next;
@ -976,7 +976,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) { while (pp != nullptr) {
pending_pick* next = pp->next; pending_pick* next = pp->next;
if (pp->pick == pick) { if (pp->pick == pick) {
pick->connected_subchannel.reset(); pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1)); "Pick Cancelled", &error, 1));

@ -81,7 +81,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol,
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
} }
} else { } else {
pick->connected_subchannel.reset(); pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
} }
} }
@ -111,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) { while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next; grpc_lb_policy_pick_state* next = pp->next;
if (pp == pick) { if (pp == pick) {
pick->connected_subchannel.reset(); pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1)); "Pick Cancelled", &error, 1));
@ -176,7 +176,8 @@ static int pf_pick_locked(grpc_lb_policy* pol,
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
// If we have a selected subchannel already, return synchronously. // If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) { if (p->selected != nullptr) {
pick->connected_subchannel = p->selected->connected_subchannel; pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
return 1; return 1;
} }
// No subchannel selected yet, so handle asynchronously. // No subchannel selected yet, so handle asynchronously.
@ -216,7 +217,8 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
grpc_closure* on_ack) { grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) { if (p->selected) {
p->selected->connected_subchannel->Ping(on_initiate, on_ack); grpc_connected_subchannel_ping(p->selected->connected_subchannel,
on_initiate, on_ack);
} else { } else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@ -295,7 +297,8 @@ static void pf_update_locked(grpc_lb_policy* policy,
subchannel_list->num_subchannels); subchannel_list->num_subchannels);
} }
if (p->selected->connected_subchannel != nullptr) { if (p->selected->connected_subchannel != nullptr) {
sd->connected_subchannel = p->selected->connected_subchannel; sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "pf_update_includes_selected");
} }
p->selected = sd; p->selected = sd;
if (p->subchannel_list != nullptr) { if (p->subchannel_list != nullptr) {
@ -407,8 +410,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// re-resolution is introduced. But we need to investigate whether we // re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected // really want to take any action instead of waiting for the selected
// subchannel reconnecting. // subchannel reconnecting.
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution. // If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, GRPC_ERROR_NONE,
@ -416,19 +419,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false; p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
// in transient failure. Rely on re-resolution to recover.
p->selected = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->subchannel_list, "pf_selected_shutdown");
grpc_lb_subchannel_data_unref_subchannel(
sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else { } else {
grpc_connectivity_state_set(&p->state_tracker, grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state, sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed"); GRPC_ERROR_REF(error), "selected_changed");
}
if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification. // Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd); grpc_lb_subchannel_data_start_connectivity_watch(sd);
} else {
p->selected = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->subchannel_list, "pf_selected_shutdown");
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
} }
} }
return; return;
@ -446,8 +450,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: { case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to // Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list. // p->subchannel_list.
sd->connected_subchannel =
grpc_subchannel_get_connected_subchannel(sd->subchannel);
if (sd->subchannel_list == p->latest_pending_subchannel_list) { if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr); GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@ -458,6 +460,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2. // Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready"); GRPC_ERROR_NONE, "connecting_ready");
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
"connected");
p->selected = sd; p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@ -469,7 +474,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_policy_pick_state* pick; grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) { while ((pick = p->pending_picks)) {
p->pending_picks = pick->next; p->pending_picks = pick->next;
pick->connected_subchannel = p->selected->connected_subchannel; pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p", "Servicing pending pick with selected subchannel %p",
@ -514,8 +520,39 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd); grpc_lb_subchannel_data_start_connectivity_watch(sd);
break; break;
} }
case GRPC_CHANNEL_SHUTDOWN: case GRPC_CHANNEL_SHUTDOWN: {
GPR_UNREACHABLE_CODE(break); grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
// Advance to next subchannel and check its state.
grpc_lb_subchannel_data* original_sd = sd;
do {
sd->subchannel_list->checking_subchannel =
(sd->subchannel_list->checking_subchannel + 1) %
sd->subchannel_list->num_subchannels;
sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == nullptr && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->subchannel_list, "pf_exhausted_subchannels");
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"exhausted_subchannels+reresolve");
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
}
} else {
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
}
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
}
} }
} }

@ -128,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
(void*)p, (unsigned long)last_ready_index, (void*)p, (unsigned long)last_ready_index,
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel, (void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void*)p->subchannel_list->subchannels[last_ready_index] (void*)p->subchannel_list->subchannels[last_ready_index]
.connected_subchannel.get()); .connected_subchannel);
} }
} }
@ -163,7 +163,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol,
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
} }
} else { } else {
pick->connected_subchannel.reset(); pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
} }
} }
@ -193,7 +193,7 @@ static void rr_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) { while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next; grpc_lb_policy_pick_state* next = pp->next;
if (pp == pick) { if (pp == pick) {
pick->connected_subchannel.reset(); pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1)); "Pick cancelled", &error, 1));
@ -217,7 +217,7 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* next = pick->next; grpc_lb_policy_pick_state* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
pick->connected_subchannel.reset(); pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1)); "Pick cancelled", &error, 1));
@ -263,7 +263,8 @@ static int rr_pick_locked(grpc_lb_policy* pol,
/* readily available, report right away */ /* readily available, report right away */
grpc_lb_subchannel_data* sd = grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index]; &p->subchannel_list->subchannels[next_ready_index];
pick->connected_subchannel = sd->connected_subchannel; pick->connected_subchannel =
GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
if (pick->user_data != nullptr) { if (pick->user_data != nullptr) {
*pick->user_data = sd->user_data; *pick->user_data = sd->user_data;
} }
@ -272,8 +273,8 @@ static int rr_pick_locked(grpc_lb_policy* pol,
GPR_DEBUG, GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")", "index %" PRIuPTR ")",
p, sd->subchannel, pick->connected_subchannel.get(), p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list,
sd->subchannel_list, next_ready_index); next_ready_index);
} }
/* only advance the last picked pointer if the selection was used */ /* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index); update_last_ready_subchannel_index_locked(p, next_ready_index);
@ -291,14 +292,15 @@ static int rr_pick_locked(grpc_lb_policy* pol,
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0); GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready; --subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0); GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures; --subchannel_list->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GPR_ASSERT(subchannel_list->num_shutdown > 0);
--subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0); GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle; --subchannel_list->num_idle;
@ -308,6 +310,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready; ++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures; ++subchannel_list->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle; ++subchannel_list->num_idle;
} }
@ -407,7 +411,6 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// either the current or latest pending subchannel lists. // either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list || GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list); sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity // Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to // state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner. // curr_connectivity_state, which is what we use inside of the combiner.
@ -415,17 +418,18 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// Update state counters and new overall state. // Update state counters and new overall state.
update_state_counters_locked(sd); update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // If the sd's new state is SHUTDOWN, unref the subchannel.
// subchannel, if any. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
switch (sd->curr_connectivity_state) { grpc_lb_subchannel_data_stop_connectivity_watch(sd);
case GRPC_CHANNEL_TRANSIENT_FAILURE: { grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
sd->connected_subchannel.reset(); grpc_lb_subchannel_list_unref_for_connectivity_watch(
break; sd->subchannel_list, "rr_connectivity_shutdown");
} } else { // sd not in SHUTDOWN
case GRPC_CHANNEL_READY: { if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) { if (sd->connected_subchannel == nullptr) {
sd->connected_subchannel = sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel); grpc_subchannel_get_connected_subchannel(sd->subchannel),
"connected");
} }
if (sd->subchannel_list != p->subchannel_list) { if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list. // promote sd->subchannel_list to p->subchannel_list.
@ -468,7 +472,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_policy_pick_state* pick; grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) { while ((pick = p->pending_picks)) {
p->pending_picks = pick->next; p->pending_picks = pick->next;
pick->connected_subchannel = selected->connected_subchannel; pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_picked");
if (pick->user_data != nullptr) { if (pick->user_data != nullptr) {
*pick->user_data = selected->user_data; *pick->user_data = selected->user_data;
} }
@ -481,15 +486,10 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
} }
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
} }
break;
} }
case GRPC_CHANNEL_SHUTDOWN: // Renew notification.
GPR_UNREACHABLE_CODE(return ); grpc_lb_subchannel_data_start_connectivity_watch(sd);
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
} }
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
} }
static grpc_connectivity_state rr_check_connectivity_locked( static grpc_connectivity_state rr_check_connectivity_locked(
@ -513,9 +513,10 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) { if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected = grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index]; &p->subchannel_list->subchannels[next_ready_index];
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target = grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel; selected->connected_subchannel, "rr_ping");
target->Ping(on_initiate, on_ack); grpc_connected_subchannel_ping(target, on_initiate, on_ack);
GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
} else { } else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected")); "Round Robin not connected"));

@ -42,7 +42,10 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
} }
GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
sd->subchannel = nullptr; sd->subchannel = nullptr;
sd->connected_subchannel.reset(); if (sd->connected_subchannel != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason);
sd->connected_subchannel = nullptr;
}
if (sd->user_data != nullptr) { if (sd->user_data != nullptr) {
GPR_ASSERT(sd->user_data_vtable != nullptr); GPR_ASSERT(sd->user_data_vtable != nullptr);
sd->user_data_vtable->destroy(sd->user_data); sd->user_data_vtable->destroy(sd->user_data);

@ -22,7 +22,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr++/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
// TODO(roth): This code is intended to be shared between pick_first and // TODO(roth): This code is intended to be shared between pick_first and
@ -44,7 +43,7 @@ typedef struct {
grpc_lb_subchannel_list* subchannel_list; grpc_lb_subchannel_list* subchannel_list;
/** subchannel itself */ /** subchannel itself */
grpc_subchannel* subchannel; grpc_subchannel* subchannel;
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; grpc_connected_subchannel* connected_subchannel;
/** Is a connectivity notification pending? */ /** Is a connectivity notification pending? */
bool connectivity_notification_pending; bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */ /** notification that connectivity has changed on subchannel */

@ -56,6 +56,10 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
&(subchannel)->connected_subchannel)))
namespace { namespace {
struct state_watcher { struct state_watcher {
grpc_closure closure; grpc_closure closure;
@ -95,7 +99,7 @@ struct grpc_subchannel {
grpc_connect_out_args connecting_result; grpc_connect_out_args connecting_result;
/** callback for connection finishing */ /** callback for connection finishing */
grpc_closure on_connected; grpc_closure connected;
/** callback for our alarm */ /** callback for our alarm */
grpc_closure on_alarm; grpc_closure on_alarm;
@ -104,13 +108,12 @@ struct grpc_subchannel {
being setup */ being setup */
grpc_pollset_set* pollset_set; grpc_pollset_set* pollset_set;
/** active connection, or null; of type grpc_connected_subchannel */
gpr_atm connected_subchannel;
/** mutex protecting remaining elements */ /** mutex protecting remaining elements */
gpr_mu mu; gpr_mu mu;
/** active connection, or null; of type grpc_core::ConnectedSubchannel
*/
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/** have we seen a disconnection? */ /** have we seen a disconnection? */
bool disconnected; bool disconnected;
/** are we connecting */ /** are we connecting */
@ -134,15 +137,16 @@ struct grpc_subchannel {
}; };
struct grpc_subchannel_call { struct grpc_subchannel_call {
grpc_core::ConnectedSubchannel* connection; grpc_connected_subchannel* connection;
grpc_closure* schedule_closure_after_destroy; grpc_closure* schedule_closure_after_destroy;
}; };
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call*)(callstack)) - 1) (((grpc_subchannel_call*)(callstack)) - 1)
static void on_subchannel_connected(void* subchannel, grpc_error* error); static void subchannel_connected(void* subchannel, grpc_error* error);
#ifndef NDEBUG #ifndef NDEBUG
#define REF_REASON reason #define REF_REASON reason
@ -160,9 +164,20 @@ static void on_subchannel_connected(void* subchannel, grpc_error* error);
*/ */
static void connection_destroy(void* arg, grpc_error* error) { static void connection_destroy(void* arg, grpc_error* error) {
grpc_channel_stack* stk = (grpc_channel_stack*)arg; grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg;
grpc_channel_stack_destroy(stk); grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(stk); gpr_free(c);
}
grpc_connected_subchannel* grpc_connected_subchannel_ref(
grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
return c;
}
void grpc_connected_subchannel_unref(
grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
} }
/* /*
@ -229,13 +244,18 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
} }
static void disconnect(grpc_subchannel* c) { static void disconnect(grpc_subchannel* c) {
grpc_connected_subchannel* con;
grpc_subchannel_index_unregister(c->key, c); grpc_subchannel_index_unregister(c->key, c);
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected); GPR_ASSERT(!c->disconnected);
c->disconnected = true; c->disconnected = true;
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected")); "Subchannel disconnected"));
c->connected_subchannel.reset(); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection");
gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
}
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
} }
@ -355,7 +375,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
if (new_args != nullptr) grpc_channel_args_destroy(new_args); if (new_args != nullptr) grpc_channel_args_destroy(new_args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev = c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher; &c->root_external_state_watcher;
GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel"); "subchannel");
@ -379,7 +399,7 @@ static void continue_connect_locked(grpc_subchannel* c) {
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change"); GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result, grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected); &c->connected);
} }
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
@ -439,7 +459,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
return; return;
} }
if (c->connected_subchannel != nullptr) { if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) {
/* Already connected: don't restart */ /* Already connected: don't restart */
return; return;
} }
@ -461,10 +481,9 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
const grpc_millis time_til_next = const grpc_millis time_til_next =
c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) { if (time_til_next <= 0) {
gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); gpr_log(GPR_INFO, "Retry immediately");
} else { } else {
gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c, gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
time_til_next);
} }
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
@ -508,56 +527,75 @@ void grpc_subchannel_notify_on_state_change(
} }
} }
static void on_connected_subchannel_connectivity_changed(void* p, void grpc_connected_subchannel_process_transport_op(
grpc_error* error) { grpc_connected_subchannel* con, grpc_transport_op* op) {
state_watcher* connected_subchannel_watcher = (state_watcher*)p; grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel* c = connected_subchannel_watcher->subchannel; grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0);
top_elem->filter->start_transport_op(top_elem, op);
}
static void subchannel_on_child_state_changed(void* p, grpc_error* error) {
state_watcher* sw = (state_watcher*)p;
grpc_subchannel* c = sw->subchannel;
gpr_mu* mu = &c->mu; gpr_mu* mu = &c->mu;
gpr_mu_lock(mu); gpr_mu_lock(mu);
switch (connected_subchannel_watcher->connectivity_state) { /* if we failed just leave this closure */
case GRPC_CHANNEL_TRANSIENT_FAILURE: if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
case GRPC_CHANNEL_SHUTDOWN: { /* any errors on a subchannel ==> we're done, create a new one */
if (!c->disconnected && c->connected_subchannel != nullptr) { sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
if (grpc_trace_stream_refcount.enabled()) { }
gpr_log(GPR_INFO, grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state,
"Connected subchannel %p of subchannel %p has gone into %s. " GRPC_ERROR_REF(error), "reflect_child");
"Attempting to reconnect.", if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
c->connected_subchannel.get(), c, grpc_connected_subchannel_notify_on_state_change(
grpc_connectivity_state_name( GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr,
connected_subchannel_watcher->connectivity_state)); &sw->connectivity_state, &sw->closure);
} GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
c->connected_subchannel.reset(); sw = nullptr;
grpc_connectivity_state_set(&c->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "reflect_child");
c->backoff_begun = false;
c->backoff->Reset();
maybe_start_connecting_locked(c);
} else {
connected_subchannel_watcher->connectivity_state =
GRPC_CHANNEL_SHUTDOWN;
}
break;
}
default: {
grpc_connectivity_state_set(
&c->state_tracker, connected_subchannel_watcher->connectivity_state,
GRPC_ERROR_REF(error), "reflect_child");
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
c->connected_subchannel->NotifyOnStateChange(
nullptr, &connected_subchannel_watcher->connectivity_state,
&connected_subchannel_watcher->closure);
connected_subchannel_watcher = nullptr;
}
} }
gpr_mu_unlock(mu); gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
gpr_free(connected_subchannel_watcher); gpr_free(sw);
}
static void connected_subchannel_state_op(grpc_connected_subchannel* con,
grpc_pollset_set* interested_parties,
grpc_connectivity_state* state,
grpc_closure* closure) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->connectivity_state = state;
op->on_connectivity_state_change = closure;
op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(elem, op);
}
void grpc_connected_subchannel_notify_on_state_change(
grpc_connected_subchannel* con, grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* closure) {
connected_subchannel_state_op(con, interested_parties, state, closure);
}
void grpc_connected_subchannel_ping(grpc_connected_subchannel* con,
grpc_closure* on_initiate,
grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(elem, op);
} }
static bool publish_transport_locked(grpc_subchannel* c) { static bool publish_transport_locked(grpc_subchannel* c) {
grpc_connected_subchannel* con;
grpc_channel_stack* stk;
state_watcher* sw_subchannel;
/* construct channel stack */ /* construct channel stack */
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments( grpc_channel_stack_builder_set_channel_arguments(
@ -569,9 +607,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
grpc_channel_stack_builder_destroy(builder); grpc_channel_stack_builder_destroy(builder);
return false; return false;
} }
grpc_channel_stack* stk;
grpc_error* error = grpc_channel_stack_builder_finish( grpc_error* error = grpc_channel_stack_builder_finish(
builder, 0, 1, connection_destroy, nullptr, (void**)&stk); builder, 0, 1, connection_destroy, nullptr, (void**)&con);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
grpc_transport_destroy(c->connecting_result.transport); grpc_transport_destroy(c->connecting_result.transport);
gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
@ -579,37 +616,38 @@ static bool publish_transport_locked(grpc_subchannel* c) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return false; return false;
} }
stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result)); memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */ /* initialize state watcher */
state_watcher* connected_subchannel_watcher = sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel));
(state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher)); sw_subchannel->subchannel = c;
connected_subchannel_watcher->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed,
GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, sw_subchannel, grpc_schedule_on_exec_ctx);
on_connected_subchannel_connectivity_changed,
connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
if (c->disconnected) { if (c->disconnected) {
gpr_free(connected_subchannel_watcher); gpr_free(sw_subchannel);
grpc_channel_stack_destroy(stk); grpc_channel_stack_destroy(stk);
gpr_free(stk); gpr_free(con);
return false; return false;
} }
/* publish */ /* publish */
c->connected_subchannel.reset( /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); I'd have expected the rel_cas below to be enough, but
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", seemingly it's not.
c->connected_subchannel.get(), c); Re-evaluate if we really need this. */
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
/* setup subchannel watching connected subchannel for changes; subchannel /* setup subchannel watching connected subchannel for changes; subchannel
ref for connecting is donated to the state watcher */ ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
c->connected_subchannel->NotifyOnStateChange( grpc_connected_subchannel_notify_on_state_change(
c->pollset_set, &connected_subchannel_watcher->connectivity_state, con, c->pollset_set, &sw_subchannel->connectivity_state,
&connected_subchannel_watcher->closure); &sw_subchannel->closure);
/* signal completion */ /* signal completion */
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
@ -617,11 +655,11 @@ static bool publish_transport_locked(grpc_subchannel* c) {
return true; return true;
} }
static void on_subchannel_connected(void* arg, grpc_error* error) { static void subchannel_connected(void* arg, grpc_error* error) {
grpc_subchannel* c = (grpc_subchannel*)arg; grpc_subchannel* c = (grpc_subchannel*)arg;
grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
c->connecting = false; c->connecting = false;
if (c->connecting_result.transport != nullptr && if (c->connecting_result.transport != nullptr &&
@ -656,10 +694,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_subchannel_call* c = (grpc_subchannel_call*)call; grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_core::ConnectedSubchannel* connection = c->connection; grpc_connected_subchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy); c->schedule_closure_after_destroy);
connection->Unref(DEBUG_LOCATION, "subchannel_call"); GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
} }
@ -690,12 +728,9 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
GPR_TIMER_END("grpc_subchannel_call_process_op", 0); GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
} }
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { grpc_subchannel* c) {
gpr_mu_lock(&c->mu); return GET_CONNECTED_SUBCHANNEL(c, acq);
auto copy = c->connected_subchannel;
gpr_mu_unlock(&c->mu);
return copy;
} }
const grpc_subchannel_key* grpc_subchannel_get_key( const grpc_subchannel_key* grpc_subchannel_get_key(
@ -703,6 +738,36 @@ const grpc_subchannel_key* grpc_subchannel_get_key(
return subchannel->key; return subchannel->key;
} }
grpc_error* grpc_connected_subchannel_create_call(
grpc_connected_subchannel* con,
const grpc_connected_subchannel_call_args* args,
grpc_subchannel_call** call) {
grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = (grpc_subchannel_call*)gpr_arena_alloc(
args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */
args->context, /* context */
args->path, /* path */
args->start_time, /* start_time */
args->deadline, /* deadline */
args->arena, /* arena */
args->call_combiner /* call_combiner */
};
grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy,
*call, &call_args);
if (error != GRPC_ERROR_NONE) {
const char* error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent);
return GRPC_ERROR_NONE;
}
grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_call_stack* grpc_subchannel_call_get_call_stack(
grpc_subchannel_call* subchannel_call) { grpc_subchannel_call* subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
@ -738,64 +803,3 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
(char*)GRPC_ARG_SUBCHANNEL_ADDRESS, (char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
} }
namespace grpc_core {
ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
: grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
channel_stack_(channel_stack) {}
ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
}
void ConnectedSubchannel::NotifyOnStateChange(
grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
grpc_closure* closure) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->connectivity_state = state;
op->on_connectivity_state_change = closure;
op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
grpc_subchannel_call** call) {
*call = (grpc_subchannel_call*)gpr_arena_alloc(
args.arena,
sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
Ref(DEBUG_LOCATION, "subchannel_call");
(*call)->connection = this;
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */
args.context, /* context */
args.path, /* path */
args.start_time, /* start_time */
args.deadline, /* deadline */
args.arena, /* arena */
args.call_combiner /* call_combiner */
};
grpc_error* error = grpc_call_stack_init(
channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) {
const char* error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
return GRPC_ERROR_NONE;
}
} // namespace grpc_core

@ -34,6 +34,7 @@
/** A (sub-)channel that knows how to connect to exactly one target /** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */ address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_args grpc_subchannel_args;
typedef struct grpc_subchannel_key grpc_subchannel_key; typedef struct grpc_subchannel_key grpc_subchannel_key;
@ -49,6 +50,10 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \
grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \ #define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
@ -62,39 +67,14 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
grpc_connected_subchannel_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif #endif
namespace grpc_core {
class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
public:
struct CallArgs {
grpc_polling_entity* pollent;
grpc_slice path;
gpr_timespec start_time;
grpc_millis deadline;
gpr_arena* arena;
grpc_call_context_element* context;
grpc_call_combiner* call_combiner;
};
explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
~ConnectedSubchannel();
grpc_channel_stack* channel_stack() { return channel_stack_; }
void NotifyOnStateChange(grpc_pollset_set* interested_parties,
grpc_connectivity_state* state,
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
private:
grpc_channel_stack* channel_stack_;
};
} // namespace grpc_core
grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* grpc_subchannel_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel* grpc_subchannel_ref_from_weak_ref( grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
@ -105,11 +85,35 @@ grpc_subchannel* grpc_subchannel_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref( void grpc_subchannel_weak_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_connected_subchannel* grpc_connected_subchannel_ref(
grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_unref(
grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref( void grpc_subchannel_call_ref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref( void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */
typedef struct {
grpc_polling_entity* pollent;
grpc_slice path;
gpr_timespec start_time;
grpc_millis deadline;
gpr_arena* arena;
grpc_call_context_element* context;
grpc_call_combiner* call_combiner;
} grpc_connected_subchannel_call_args;
grpc_error* grpc_connected_subchannel_create_call(
grpc_connected_subchannel* connected_subchannel,
const grpc_connected_subchannel_call_args* args,
grpc_subchannel_call** subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
grpc_connected_subchannel* subchannel, grpc_transport_op* op);
/** poll the current connectivity state of a channel */ /** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel* channel, grpc_error** error); grpc_subchannel* channel, grpc_error** error);
@ -119,12 +123,17 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
void grpc_subchannel_notify_on_state_change( void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_subchannel* channel, grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify); grpc_connectivity_state* state, grpc_closure* notify);
void grpc_connected_subchannel_notify_on_state_change(
/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties,
* (which may happen before it initially connects or during transient failures) grpc_connectivity_state* state, grpc_closure* notify);
* */ void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel,
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> grpc_closure* on_initiate,
grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); grpc_closure* on_ack);
/** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */
grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
grpc_subchannel* subchannel);
/** return the subchannel index key for \a subchannel */ /** return the subchannel index key for \a subchannel */
const grpc_subchannel_key* grpc_subchannel_get_key( const grpc_subchannel_key* grpc_subchannel_get_key(

@ -676,42 +676,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
GPR_ASSERT(gpr_time_cmp(deadline, now) > 0); GPR_ASSERT(gpr_time_cmp(deadline, now) > 0);
} }
TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
const int kNumServers = 3;
StartServers(kNumServers);
const auto ports = GetServersPorts();
ResetStub(ports, "round_robin");
SetNextResolution(ports);
for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i);
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk();
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// One request should have gone to each server.
for (size_t i = 0; i < servers_.size(); ++i) {
EXPECT_EQ(1, servers_[i]->service_.request_count());
}
const auto pre_death = servers_[0]->service_.request_count();
// Kill the first server.
servers_[0]->Shutdown(true);
// Client request still succeed. May need retrying if RR had returned a pick
// before noticing the change in the server's connectivity.
while (!SendRpc())
; // Retry until success.
// Send a bunch of RPCs that should succeed.
for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk();
const auto post_death = servers_[0]->service_.request_count();
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
// Bring the first server back up.
servers_[0].reset(new ServerData(server_host_, ports[0]));
// Requests should start arriving at the first server either right away (if
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before
// the server was fully back up).
WaitForServer(0);
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save