From 149f09da97393cb95ab715bbd823eb0ddc948c83 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 17 Nov 2016 20:43:10 -0800 Subject: [PATCH] Rewrote connectivity status logic for gRPC LB --- src/core/ext/lb_policy/grpclb/grpclb.c | 168 +++++++++++++++++++------ test/cpp/grpclb/grpclb_test.cc | 5 + 2 files changed, 138 insertions(+), 35 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index b7f46984008..e094fdc7996 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -467,6 +467,65 @@ static grpc_lb_addresses *process_serverlist( return lb_addresses; } +/* returns true if the new RR policy should replace the current one, if any */ +static bool update_lb_connectivity_status_locked( + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { + grpc_error *curr_state_error; + const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( + &glb_policy->state_tracker, &curr_state_error); + + /* The new connectivity status is a function of the previous one and the new + * input coming from the status of the RR policy. + * + * old state (grpclb's) + * | + * v || I | C | R | TF | SD | <- new state (RR's) + * ===++====+=====+=====+======+======+ + * I || I | C | R | I | I | + * ---++----+-----+-----+------+------+ + * C || I | C | R | C | C | + * ---++----+-----+-----+------+------+ + * R || I | C | R | R | R | + * ---++----+-----+-----+------+------+ + * TF || I | C | R | TF | TF | + * ---++----+-----+-----+------+------+ + * SD || NA | NA | NA | NA | NA | (*) + * ---++----+-----+-----+------+------+ + * + * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to + * the previous RR instance. + * + * Note that the status is never updated to SHUTDOWN as a result of calling + * this function. Only glb_shutdown() has the power to set that state. + * + * (*) This function mustn't be called during shutting down. */ + GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); + + switch (new_rr_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: + GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE); + return false; /* don't replace the RR policy */ + case GRPC_CHANNEL_INIT: + case GRPC_CHANNEL_IDLE: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_READY: + GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); + } + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Setting grpclb's state to %s from new RR policy %p state.", + grpc_connectivity_state_name(new_rr_state), + (void *)glb_policy->rr_policy); + } + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + new_rr_state, GRPC_ERROR_REF(new_rr_state_error), + "update_lb_connectivity_status_locked"); + return true; +} + /* perform a pick over \a rr_policy. Given that a pick can return immediately * (ignoring its completion callback) we need to perform the cleanups this * callback would be otherwise resposible for */ @@ -529,49 +588,81 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy, grpc_error *error) { + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); - } - if (glb_policy->rr_policy != NULL) { - /* if we are phasing out an existing RR instance, unref it. */ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); - } + if (glb_policy->shutting_down) return; + + grpc_lb_policy *old_rr_policy = glb_policy->rr_policy; glb_policy->rr_policy = create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); + if (glb_policy->rr_policy == NULL) { + gpr_log(GPR_ERROR, + "Failure creating a RoundRobin policy for serverlist update with " + "%lu entries. The previous RR instance (%p), if any, will continue " + "to be used. Future updates from the LB will attempt to create new " + "instances.", + (unsigned long)glb_policy->serverlist->num_servers, + (void *)old_rr_policy); + /* restore the old policy */ + glb_policy->rr_policy = old_rr_policy; + return; + } + + grpc_error *new_rr_state_error = NULL; + const grpc_connectivity_state new_rr_state = + grpc_lb_policy_check_connectivity(exec_ctx, glb_policy->rr_policy, + &new_rr_state_error); + /* Connectivity state is a function of the new RR policy just created */ + const bool replace_old_rr = update_lb_connectivity_status_locked( + exec_ctx, glb_policy, new_rr_state, new_rr_state_error); + + if (!replace_old_rr) { + /* dispose of the new RR policy that won't be used after all */ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, + "rr_handover_no_replace"); + /* restore the old policy */ + glb_policy->rr_policy = old_rr_policy; + return; + } + if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy); + gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); } - GPR_ASSERT(glb_policy->rr_policy != NULL); + if (old_rr_policy != NULL) { + /* if we are phasing out an existing RR instance, unref it. */ + GRPC_LB_POLICY_UNREF(exec_ctx, old_rr_policy, "rr_handover"); + } + + /* Add the gRPC LB's interested_parties pollset_set to that of the newly + * created RR policy. This will make the RR policy progress upon activity on + * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(exec_ctx, glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); + /* Allocate the data for the tracking of the new RR policy's connectivity. + * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, rr_connectivity); rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = grpc_lb_policy_check_connectivity( - exec_ctx, glb_policy->rr_policy, &error); + rr_connectivity->state = new_rr_state; - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_connectivity->state, GRPC_ERROR_REF(error), - "rr_handover"); - /* subscribe */ + /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); - /* flush pending ops */ + /* Update picks and pings in wait */ pending_pick *pp; while ((pp = glb_policy->pending_picks)) { glb_policy->pending_picks = pp->next; @@ -602,28 +693,35 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - /* If shutdown or error free the arg. Rely on the rest of the code to set the - * right grpclb status. */ - rr_connectivity_data *rr_conn_data = arg; - glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - gpr_mu_lock(&glb_policy->mu); + rr_connectivity_data *rr_connectivity = arg; + glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && - !glb_policy->shutting_down) { - /* RR not shutting down. Mimic the RR's policy state */ - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, GRPC_ERROR_REF(error), - "rr_connectivity_cb"); - /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ + gpr_mu_lock(&glb_policy->mu); + const bool shutting_down = glb_policy->shutting_down; + grpc_lb_policy *maybe_unref = NULL; + GRPC_ERROR_REF(error); + + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) { + /* RR policy shutting down. Don't renew subscription and free the arg of + * this callback. In addition we need to stash away the current policy to + * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last + * one, the policy would be destroyed, alongside the lock, which would + * result in a use-after-free */ + maybe_unref = &glb_policy->base; + gpr_free(rr_connectivity); + } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ + update_lb_connectivity_status_locked(exec_ctx, glb_policy, + rr_connectivity->state, error); + /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_conn_data->state, - &rr_conn_data->on_change); - } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "rr_connectivity_cb"); - gpr_free(rr_conn_data); + &rr_connectivity->state, + &rr_connectivity->on_change); } gpr_mu_unlock(&glb_policy->mu); + if (maybe_unref != NULL) { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, maybe_unref, "rr_connectivity_cb"); + } + GRPC_ERROR_UNREF(error); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, @@ -1133,7 +1231,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, /* and update the copy in the glb_lb_policy instance */ glb_policy->serverlist = serverlist; - rr_handover_locked(exec_ctx, glb_policy, error); + rr_handover_locked(exec_ctx, glb_policy); } } else { if (grpc_lb_glb_trace) { diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 175786332b3..61309805db8 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -79,6 +79,9 @@ extern "C" { // - Test against a non-LB server. // - Random LB server closing the stream unexpectedly. // - Test using DNS-resolvable names (localhost?) +// - Test handling of creation of faulty RR instance by having the LB return a +// serverlist with non-existent backends after having initially returned a +// valid one. // // Findings from end to end testing to be covered here: // - Handling of LB servers restart, including reconnection after backing-off @@ -521,6 +524,8 @@ static void perform_request(client_fixture *cf) { CQ_EXPECT_COMPLETION(cqv, tag(2), 1); cq_verify(cqv); GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD)); + GPR_ASSERT(grpc_channel_check_connectivity_state( + cf->client, 0 /* try to connect */) == GRPC_CHANNEL_READY); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(response_payload_recv);