Merge pull request #8787 from dgquintas/grpclb_reworked_conn_state

Rewrote connectivity status logic for gRPC LB
pull/8804/head
David G. Quintas 8 years ago committed by GitHub
commit 8d02f9b988
  1. 1
      grpc.def
  2. 171
      src/core/ext/lb_policy/grpclb/grpclb.c
  3. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  4. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  5. 10
      test/cpp/grpclb/grpclb_test.cc

@ -151,6 +151,7 @@ EXPORTS
gpr_empty_slice
grpc_slice_cmp
grpc_slice_str_cmp
grpc_slice_is_equivalent
grpc_slice_buffer_init
grpc_slice_buffer_destroy
grpc_slice_buffer_add

@ -475,6 +475,68 @@ static grpc_lb_addresses *process_serverlist_locked(
return lb_addresses;
}
/* returns true if the new RR policy should replace the current one, if any */
static bool update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
grpc_error *curr_state_error;
const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(
&glb_policy->state_tracker, &curr_state_error);
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy.
*
* current state (grpclb's)
* |
* v || I | C | R | TF | SD | <- new state (RR's)
* ===++====+=====+=====+======+======+
* I || I | C | R | [I] | [I] |
* ---++----+-----+-----+------+------+
* C || I | C | R | [C] | [C] |
* ---++----+-----+-----+------+------+
* R || I | C | R | [R] | [R] |
* ---++----+-----+-----+------+------+
* TF || I | C | R | [TF] | [TF] |
* ---++----+-----+-----+------+------+
* SD || NA | NA | NA | NA | NA | (*)
* ---++----+-----+-----+------+------+
*
* A [STATE] indicates that the old RR policy is kept. In those cases, STATE
* is the current state of grpclb, which is left untouched.
*
* In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
* the previous RR instance.
*
* Note that the status is never updated to SHUTDOWN as a result of calling
* this function. Only glb_shutdown() has the power to set that state.
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
switch (new_rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE);
return false; /* don't replace the RR policy */
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE);
}
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"Setting grpclb's state to %s from new RR policy %p state.",
grpc_connectivity_state_name(new_rr_state),
(void *)glb_policy->rr_policy);
}
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
new_rr_state, GRPC_ERROR_REF(new_rr_state_error),
"update_lb_connectivity_status_locked");
return true;
}
/* perform a pick over \a rr_policy. Given that a pick can return immediately
* (ignoring its completion callback) we need to perform the cleanups this
* callback would be otherwise resposible for */
@ -537,49 +599,84 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
/* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy, grpc_error *error) {
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
if (glb_policy->shutting_down) return;
grpc_lb_policy *new_rr_policy =
create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
"Failure creating a RoundRobin policy for serverlist update with "
"%lu entries. The previous RR instance (%p), if any, will continue "
"to be used. Future updates from the LB will attempt to create new "
"instances.",
(unsigned long)glb_policy->serverlist->num_servers,
(void *)glb_policy->rr_policy);
return;
}
grpc_error *new_rr_state_error = NULL;
const grpc_connectivity_state new_rr_state =
grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy,
&new_rr_state_error);
/* Connectivity state is a function of the new RR policy just created */
const bool replace_old_rr = update_lb_connectivity_status_locked(
exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
if (!replace_old_rr) {
/* dispose of the new RR policy that won't be used after all */
GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace");
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"Keeping old RR policy (%p) despite new serverlist: new RR "
"policy was in %s connectivity state.",
(void *)glb_policy->rr_policy,
grpc_connectivity_state_name(new_rr_state));
}
return;
}
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy);
gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)",
(void *)new_rr_policy, (void *)glb_policy->rr_policy);
}
if (glb_policy->rr_policy != NULL) {
/* if we are phasing out an existing RR instance, unref it. */
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
}
glb_policy->rr_policy =
create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy);
}
/* Finally update the RR policy to the newly created one */
glb_policy->rr_policy = new_rr_policy;
GPR_ASSERT(glb_policy->rr_policy != NULL);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(exec_ctx,
glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
/* Allocate the data for the tracking of the new RR policy's connectivity.
* It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = grpc_lb_policy_check_connectivity(
exec_ctx, glb_policy->rr_policy, &error);
rr_connectivity->state = new_rr_state;
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_connectivity->state, GRPC_ERROR_REF(error),
"rr_handover");
/* subscribe */
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
/* flush pending ops */
/* Update picks and pings in wait */
pending_pick *pp;
while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next;
@ -610,28 +707,36 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
/* If shutdown or error free the arg. Rely on the rest of the code to set the
* right grpclb status. */
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
gpr_mu_lock(&glb_policy->mu);
rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
!glb_policy->shutting_down) {
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error),
"rr_connectivity_cb");
/* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
gpr_mu_lock(&glb_policy->mu);
const bool shutting_down = glb_policy->shutting_down;
bool unref_needed = false;
GRPC_ERROR_REF(error);
if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
/* RR policy shutting down. Don't renew subscription and free the arg of
* this callback. In addition we need to stash away the current policy to
* be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
* one, the policy would be destroyed, alongside the lock, which would
* result in a use-after-free */
unref_needed = true;
gpr_free(rr_connectivity);
} else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
update_lb_connectivity_status_locked(exec_ctx, glb_policy,
rr_connectivity->state, error);
/* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
&rr_conn_data->on_change);
} else {
&rr_connectivity->state,
&rr_connectivity->on_change);
}
gpr_mu_unlock(&glb_policy->mu);
if (unref_needed) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
gpr_free(rr_conn_data);
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@ -1141,7 +1246,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
/* and update the copy in the glb_lb_policy instance */
glb_policy->serverlist = serverlist;
rr_handover_locked(exec_ctx, glb_policy, error);
rr_handover_locked(exec_ctx, glb_policy);
}
} else {
if (grpc_lb_glb_trace) {

@ -189,6 +189,7 @@ grpc_slice_split_head_type grpc_slice_split_head_import;
gpr_empty_slice_type gpr_empty_slice_import;
grpc_slice_cmp_type grpc_slice_cmp_import;
grpc_slice_str_cmp_type grpc_slice_str_cmp_import;
grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import;
grpc_slice_buffer_init_type grpc_slice_buffer_init_import;
grpc_slice_buffer_destroy_type grpc_slice_buffer_destroy_import;
grpc_slice_buffer_add_type grpc_slice_buffer_add_import;
@ -464,6 +465,7 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_empty_slice_import = (gpr_empty_slice_type) GetProcAddress(library, "gpr_empty_slice");
grpc_slice_cmp_import = (grpc_slice_cmp_type) GetProcAddress(library, "grpc_slice_cmp");
grpc_slice_str_cmp_import = (grpc_slice_str_cmp_type) GetProcAddress(library, "grpc_slice_str_cmp");
grpc_slice_is_equivalent_import = (grpc_slice_is_equivalent_type) GetProcAddress(library, "grpc_slice_is_equivalent");
grpc_slice_buffer_init_import = (grpc_slice_buffer_init_type) GetProcAddress(library, "grpc_slice_buffer_init");
grpc_slice_buffer_destroy_import = (grpc_slice_buffer_destroy_type) GetProcAddress(library, "grpc_slice_buffer_destroy");
grpc_slice_buffer_add_import = (grpc_slice_buffer_add_type) GetProcAddress(library, "grpc_slice_buffer_add");

@ -518,6 +518,9 @@ extern grpc_slice_cmp_type grpc_slice_cmp_import;
typedef int(*grpc_slice_str_cmp_type)(grpc_slice a, const char *b);
extern grpc_slice_str_cmp_type grpc_slice_str_cmp_import;
#define grpc_slice_str_cmp grpc_slice_str_cmp_import
typedef int(*grpc_slice_is_equivalent_type)(grpc_slice a, grpc_slice b);
extern grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import;
#define grpc_slice_is_equivalent grpc_slice_is_equivalent_import
typedef void(*grpc_slice_buffer_init_type)(grpc_slice_buffer *sb);
extern grpc_slice_buffer_init_type grpc_slice_buffer_init_import;
#define grpc_slice_buffer_init grpc_slice_buffer_init_import

@ -79,6 +79,9 @@ extern "C" {
// - Test against a non-LB server.
// - Random LB server closing the stream unexpectedly.
// - Test using DNS-resolvable names (localhost?)
// - Test handling of creation of faulty RR instance by having the LB return a
// serverlist with non-existent backends after having initially returned a
// valid one.
//
// Findings from end to end testing to be covered here:
// - Handling of LB servers restart, including reconnection after backing-off
@ -342,9 +345,8 @@ static void start_backend_server(server_fixture *sf) {
}
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
const string expected_token =
strlen(sf->lb_token_prefix) == 0
? ""
: sf->lb_token_prefix + std::to_string(sf->port);
strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix +
std::to_string(sf->port);
GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token",
expected_token.c_str()));
@ -522,6 +524,8 @@ static void perform_request(client_fixture *cf) {
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
cq_verify(cqv);
GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
GPR_ASSERT(grpc_channel_check_connectivity_state(
cf->client, 0 /* try to connect */) == GRPC_CHANNEL_READY);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);

Loading…
Cancel
Save