From 98da61ba109405f173fbe8e11d169ab41fc407fb Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Sat, 29 Oct 2016 08:46:31 +0200 Subject: [PATCH 1/6] gRPC LB fixes from end two end testing --- src/core/ext/lb_policy/grpclb/grpclb.c | 632 +++++++++--------- .../ext/lb_policy/round_robin/round_robin.c | 39 +- .../client/secure/secure_channel_create.c | 2 +- .../security/transport/security_connector.c | 4 +- 4 files changed, 364 insertions(+), 313 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 6da4febf266..d2af27e7bfc 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -43,30 +43,26 @@ * policy to select from this list of LB server backends. * * The first time the policy gets a request for a pick, a ping, or to exit the - * idle state, \a query_for_backends() is called. It creates an instance of \a - * lb_client_data, an internal struct meant to contain the data associated with - * the internal communication with the LB server. This instance is created via - * \a lb_client_data_create(). There, the call over lb_channel to pick-first - * from {a1..an} is created, the \a LoadBalancingRequest message is assembled - * and all necessary callbacks for the progress of the internal call configured. + * idle state, \a query_for_backends_locked() is called. This function sets up + * and initiates the internal communication with the LB server. In particular, + * it's responsible for instantiating the internal *streaming* call to the LB + * server (whichever address from {a1..an} pick-first chose). This call is + * serviced by two callbacks, \a srv_status_rcvd and \a res_rcvd. The former + * will be called when the call to the LB server completes. This can happen if + * the LB server closes the connection or if this policy itself cancels the call + * (for example because it's shutting down). If the call fails with + * UNIMPLEMENTED, the original picks/pings will fail. This signals that there's + * a misconfiguration somewhere: at least one of {a1..an} isn't an LB server, + * which contradicts the LB bit being set. If the internal call times out, the + * usual behavior of pick-first applies, continuing to pick from the list + * {a1..an}. * - * Back in \a query_for_backends(), the internal *streaming* call to the LB - * server (whichever address from {a1..an} pick-first chose) is kicked off. - * It'll progress over the callbacks configured in \a lb_client_data_create() - * (see the field docstrings of \a lb_client_data for more details). - * - * If the call fails with UNIMPLEMENTED, the original call will also fail. - * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB - * server, which contradicts the LB bit being set. If the internal call times - * out, the usual behavior of pick-first applies, continuing to pick from the - * list {a1..an}. - * - * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An - * invalid one results in the termination of the streaming call. A new streaming - * call should be created if possible, failing the original call otherwise. - * For a valid \a LoadBalancingResponse, the server list of actual backends is - * extracted. A Round Robin policy will be created from this list. There are two - * possible scenarios: + * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a + * res_recv. An invalid one results in the termination of the streaming call. A + * new streaming call should be created if possible, failing the original call + * otherwise. For a valid \a LoadBalancingResponse, the server list of actual + * backends is extracted. A Round Robin policy will be created from this list. + * There are two possible scenarios: * * 1. This is the first server list received. There was no previous instance of * the Round Robin policy. \a rr_handover_locked() will instantiate the RR @@ -120,12 +116,20 @@ #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/static_metadata.h" +#define BACKOFF_MULTIPLIER 1.6 +#define BACKOFF_JITTER 0.2 +#define BACKOFF_MIN_SECONDS 10 +#define BACKOFF_MAX_SECONDS 60 + int grpc_lb_glb_trace = 0; /* add lb_token of selected subchannel (address) to the call's initial @@ -174,13 +178,12 @@ typedef struct wrapped_rr_closure_arg { static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { wrapped_rr_closure_arg *wc_arg = arg; - if (wc_arg->rr_policy != NULL) { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), + NULL); + + if (wc_arg->rr_policy != NULL) { /* if target is NULL, no pick has been made by the RR policy (eg, all * addresses failed to connect). There won't be any user_data/token * available */ @@ -189,10 +192,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); } + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), - NULL); GPR_ASSERT(wc_arg->free_when_done != NULL); gpr_free(wc_arg->free_when_done); } @@ -264,7 +269,6 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { * glb_lb_policy */ typedef struct rr_connectivity_data rr_connectivity_data; -struct lb_client_data; static const grpc_lb_policy_vtable glb_lb_policy_vtable; typedef struct glb_lb_policy { /** base policy: must be first */ @@ -296,20 +300,53 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; - /** addresses from \a serverlist */ - grpc_lb_addresses *addresses; - /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; /** list of pings that are waiting on RR's policy connectivity */ pending_ping *pending_pings; - /** client data associated with the LB server communication */ - struct lb_client_data *lb_client; + bool shutting_down; + + /************************************************************/ + /* client data associated with the LB server communication */ + /************************************************************/ + /* called once initial metadata's been sent */ + grpc_closure md_sent; + + /* called once the LoadBalanceRequest has been sent to the LB server. See + * src/proto/grpc/.../load_balancer.proto */ + grpc_closure req_sent; + + /* A response from the LB server has been received (or error). Process it */ + grpc_closure res_rcvd; + + /* ... and the status from the LB server has been received */ + grpc_closure srv_status_rcvd; + + grpc_call *lb_call; /* streaming call to the LB server, */ + + grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ + grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ + + /* what's being sent to the LB server. Note that its value may vary if the LB + * server indicates a redirect. */ + grpc_byte_buffer *request_payload; + + /* response from the LB server, if any. Processed in res_recv_cb() */ + grpc_byte_buffer *response_payload; + + /* the call's status and status detailset in srv_status_rcvd_cb() */ + grpc_status_code lb_call_status; + char *lb_call_status_details; + size_t lb_call_status_details_capacity; + + /** LB call retry backoff state */ + gpr_backoff lb_call_backoff_state; + + /** LB call retry timer */ + grpc_timer lb_call_retry_timer; - /** for tracking of the RR connectivity */ - rr_connectivity_data *rr_connectivity; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -427,7 +464,6 @@ static grpc_lb_addresses *process_serverlist( ++addr_idx; } GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; } @@ -448,7 +484,7 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); /* add the load reporting initial metadata */ initial_metadata_add_lb_token(pick_args->initial_metadata, @@ -461,7 +497,6 @@ static bool pick_from_internal_rr_locked( * pending pick list inside the RR policy (glb_policy->rr_policy). * Eventually, wrapped_on_complete will be called, which will -among other * things- add the LB token to the call's initial metadata */ - return pick_done; } @@ -470,54 +505,69 @@ static grpc_lb_policy *create_rr_locked( glb_lb_policy *glb_policy) { GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - if (glb_policy->addresses != NULL) { - /* dispose of the previous version */ - grpc_lb_addresses_destroy(glb_policy->addresses); - } - glb_policy->addresses = process_serverlist(serverlist); - grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + grpc_lb_addresses *addresses = process_serverlist(serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg arg = - grpc_lb_addresses_create_channel_arg(glb_policy->addresses); + const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); args.args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); + GPR_ASSERT(rr != NULL); + grpc_lb_addresses_destroy(addresses); grpc_channel_args_destroy(args.args); - return rr; } +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +/* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); + } + if (glb_policy->rr_policy != NULL) { + /* if we are phasing out an existing RR instance, unref it. */ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover_locked"); + } + glb_policy->rr_policy = create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy); } + GPR_ASSERT(glb_policy->rr_policy != NULL); grpc_pollset_set_add_pollset_set(exec_ctx, glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( + + rr_connectivity_data *rr_connectivity = + gpr_malloc(sizeof(rr_connectivity_data)); + memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); + grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, + rr_connectivity); + rr_connectivity->glb_policy = glb_policy; + rr_connectivity->state = grpc_lb_policy_check_connectivity( exec_ctx, glb_policy->rr_policy, &error); - grpc_lb_policy_notify_on_state_change( - exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state, - &glb_policy->rr_connectivity->on_change); + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - glb_policy->rr_connectivity->state, - GRPC_ERROR_REF(error), "rr_handover"); + rr_connectivity->state, GRPC_ERROR_REF(error), + "rr_handover"); + /* subscribe */ + grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); /* flush pending ops */ @@ -551,35 +601,24 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + /* If shutdown or error free the arg. Rely on the rest of the code to set the + * right grpclb status. */ rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) { - if (glb_policy->serverlist != NULL) { - /* a RR policy is shutting down but there's a serverlist available -> - * perform a handover */ - gpr_mu_lock(&glb_policy->mu); - rr_handover_locked(exec_ctx, glb_policy, error); - gpr_mu_unlock(&glb_policy->mu); - } else { - /* shutting down and no new serverlist available. Bail out. */ - gpr_free(rr_conn_data); - } + if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN) { + gpr_mu_lock(&glb_policy->mu); + /* RR not shutting down. Mimic the RR's policy state */ + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + rr_conn_data->state, GRPC_ERROR_REF(error), + "glb_rr_connectivity_changed"); + /* resubscribe */ + grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, + &rr_conn_data->state, + &rr_conn_data->on_change); + gpr_mu_unlock(&glb_policy->mu); } else { - if (error == GRPC_ERROR_NONE) { - gpr_mu_lock(&glb_policy->mu); - /* RR not shutting down. Mimic the RR's policy state */ - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, GRPC_ERROR_REF(error), - "glb_rr_connectivity_changed"); - /* resubscribe */ - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_conn_data->state, - &rr_conn_data->on_change); - gpr_mu_unlock(&glb_policy->mu); - } else { /* error */ - gpr_free(rr_conn_data); - } + gpr_free(rr_conn_data); } } @@ -682,18 +721,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } - rr_connectivity_data *rr_connectivity = - gpr_malloc(sizeof(rr_connectivity_data)); - memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, - rr_connectivity); - rr_connectivity->glb_policy = glb_policy; - glb_policy->rr_connectivity = rr_connectivity; - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable); gpr_mu_init(&glb_policy->mu); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + return &glb_policy->base; } @@ -710,14 +742,13 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); - grpc_lb_addresses_destroy(glb_policy->addresses); gpr_free(glb_policy); } -static void lb_client_data_destroy(struct lb_client_data *lb_client); static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); + glb_policy->shutting_down = true; pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; @@ -741,15 +772,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } if (glb_policy->rr_policy) { - /* unsubscribe */ - grpc_lb_policy_notify_on_state_change( - exec_ctx, glb_policy->rr_policy, NULL, - &glb_policy->rr_connectivity->on_change); GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } - lb_client_data_destroy(glb_policy->lb_client); - glb_policy->lb_client = NULL; + if (glb_policy->started_picking) { + if (glb_policy->lb_call != NULL) { + grpc_call_cancel(glb_policy->lb_call, NULL); + /* srv_status_rcvd_cb will pick up the cancellation and clean up */ + } + } grpc_connectivity_state_set( exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, @@ -780,17 +811,12 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); - if (glb_policy->lb_client != NULL) { - /* cancel the call to the load balancer service, if any */ - grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL); - } pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -810,18 +836,20 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void query_for_backends(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy); -static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { +static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy); +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { glb_policy->started_picking = true; - query_for_backends(exec_ctx, glb_policy); + gpr_backoff_reset(&glb_policy->lb_call_backoff_state); + query_for_backends_locked(exec_ctx, glb_policy); } static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } gpr_mu_unlock(&glb_policy->mu); } @@ -847,8 +875,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (glb_policy->rr_policy != NULL) { if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", + (void *)glb_policy, (void *)glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); @@ -865,11 +893,17 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, pick_args, target, wc_arg); } else { + if (grpc_lb_glb_trace) { + gpr_log(GPR_DEBUG, + "No RR policy in grpclb instance %p. Adding to grpclb's pending " + "picks", + (void *)(glb_policy)); + } add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } pick_done = false; } @@ -898,7 +932,7 @@ static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } else { add_pending_ping(&glb_policy->pending_pings, closure); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } } gpr_mu_unlock(&glb_policy->mu); @@ -916,250 +950,181 @@ static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&glb_policy->mu); } -/* - * lb_client_data - * - * Used internally for the client call to the LB */ -typedef struct lb_client_data { - gpr_mu mu; - - /* called once initial metadata's been sent */ - grpc_closure md_sent; - - /* called once the LoadBalanceRequest has been sent to the LB server. See - * src/proto/grpc/.../load_balancer.proto */ - grpc_closure req_sent; - - /* A response from the LB server has been received (or error). Process it */ - grpc_closure res_rcvd; - - /* After the client has sent a close to the LB server */ - grpc_closure close_sent; - - /* ... and the status from the LB server has been received */ - grpc_closure srv_status_rcvd; - - grpc_call *lb_call; /* streaming call to the LB server, */ - gpr_timespec deadline; /* for the streaming call to the LB server */ - - grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ - grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ - - /* what's being sent to the LB server. Note that its value may vary if the LB - * server indicates a redirect. */ - grpc_byte_buffer *request_payload; - - /* response from the LB server, if any. Processed in res_recv_cb() */ - grpc_byte_buffer *response_payload; - - /* the call's status and status detailset in srv_status_rcvd_cb() */ - grpc_status_code status; - char *status_details; - size_t status_details_capacity; - - /* pointer back to the enclosing policy */ - glb_lb_policy *glb_policy; -} lb_client_data; - -static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); - -static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { +static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +static void lb_client_init(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); - lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data)); - memset(lb_client, 0, sizeof(lb_client_data)); - - gpr_mu_init(&lb_client->mu); - grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - - grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); - grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); - grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); - grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - - lb_client->deadline = glb_policy->deadline; - /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ - lb_client->lb_call = grpc_channel_create_pollset_set_call( + glb_policy->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, - lb_client->deadline, NULL); + glb_policy->deadline, NULL); - grpc_metadata_array_init(&lb_client->initial_metadata_recv); - grpc_metadata_array_init(&lb_client->trailing_metadata_recv); + grpc_metadata_array_init(&glb_policy->initial_metadata_recv); + grpc_metadata_array_init(&glb_policy->trailing_metadata_recv); grpc_grpclb_request *request = grpc_grpclb_request_create(glb_policy->server_name); gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); - lb_client->request_payload = + glb_policy->request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); gpr_slice_unref(request_payload_slice); grpc_grpclb_request_destroy(request); - lb_client->status_details = NULL; - lb_client->status_details_capacity = 0; - lb_client->glb_policy = glb_policy; - return lb_client; + glb_policy->lb_call_status_details = NULL; + glb_policy->lb_call_status_details_capacity = 0; + + grpc_closure_init(&glb_policy->srv_status_rcvd, srv_status_rcvd_cb, + glb_policy); + grpc_closure_init(&glb_policy->res_rcvd, res_recv_cb, glb_policy); + + gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER, + BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, + BACKOFF_MAX_SECONDS * 1000); } -static void lb_client_data_destroy(lb_client_data *lb_client) { - grpc_call_destroy(lb_client->lb_call); - grpc_metadata_array_destroy(&lb_client->initial_metadata_recv); - grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv); +static void lb_client_destroy(glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->lb_call != NULL); + grpc_call_destroy(glb_policy->lb_call); + glb_policy->lb_call = NULL; - grpc_byte_buffer_destroy(lb_client->request_payload); + grpc_metadata_array_destroy(&glb_policy->initial_metadata_recv); + grpc_metadata_array_destroy(&glb_policy->trailing_metadata_recv); - gpr_free(lb_client->status_details); - gpr_mu_destroy(&lb_client->mu); - gpr_free(lb_client); -} -static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) { - return lb_client->lb_call; + grpc_byte_buffer_destroy(glb_policy->request_payload); + gpr_free(glb_policy->lb_call_status_details); } /* * Auxiliary functions and LB client callbacks. */ -static void query_for_backends(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends_locked"); + lb_client_init(glb_policy); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", + (void *)glb_policy, (void *)glb_policy->lb_call); + } + GPR_ASSERT(glb_policy->lb_call != NULL); - glb_policy->lb_client = lb_client_data_create(glb_policy); grpc_call_error call_error; - grpc_op ops[1]; + grpc_op ops[4]; memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_client->md_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); - op = ops; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = - &glb_policy->lb_client->trailing_metadata_recv; - op->data.recv_status_on_client.status = &glb_policy->lb_client->status; - op->data.recv_status_on_client.status_details = - &glb_policy->lb_client->status_details; - op->data.recv_status_on_client.status_details_capacity = - &glb_policy->lb_client->status_details_capacity; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &glb_policy->initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_client->srv_status_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; + GPR_ASSERT(glb_policy->request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = lb_client->request_payload; + op->data.send_message = glb_policy->request_payload; op->flags = 0; op->reserved = NULL; op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->req_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} -static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - - grpc_op ops[2]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; - - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &glb_policy->trailing_metadata_recv; + op->data.recv_status_on_client.status = &glb_policy->lb_call_status; + op->data.recv_status_on_client.status_details = + &glb_policy->lb_call_status_details; + op->data.recv_status_on_client.status_details_capacity = + &glb_policy->lb_call_status_details_capacity; op->flags = 0; op->reserved = NULL; op++; + call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call, + ops, (size_t)(op - ops), + &glb_policy->srv_status_rcvd); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &lb_client->response_payload; + op->data.recv_message = &glb_policy->response_payload; op->flags = 0; op->reserved = NULL; op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->res_rcvd); + call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call, + ops, (size_t)(op - ops), + &glb_policy->res_rcvd); GPR_ASSERT(GRPC_CALL_OK == call_error); } static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; + glb_lb_policy *glb_policy = arg; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - if (lb_client->response_payload != NULL) { + if (glb_policy->response_payload != NULL) { + gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside - * lb_client->response_payload, for a serverlist. */ + * glb_policy->response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); + grpc_byte_buffer_reader_init(&bbr, glb_policy->response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_destroy(lb_client->response_payload); + grpc_byte_buffer_destroy(glb_policy->response_payload); grpc_grpclb_serverlist *serverlist = grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != NULL) { + GPR_ASSERT(glb_policy->lb_call != NULL); gpr_slice_unref(response_slice); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Serverlist with %lu servers received", (unsigned long)serverlist->num_servers); + /* TODO(dgq): this needs to work with ipv6. */ + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + struct sockaddr_in *sa = (struct sockaddr_in *)&addr.addr; + addr.len = sizeof(struct sockaddr_in); + sa->sin_family = AF_INET; + sa->sin_port = htons((uint16_t)serverlist->servers[i]->port); + memcpy(&sa->sin_addr, serverlist->servers[i]->ip_address.bytes, + serverlist->servers[i]->ip_address.size); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } } /* update serverlist */ if (serverlist->num_servers > 0) { - gpr_mu_lock(&lb_client->glb_policy->mu); - if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, - serverlist)) { + gpr_mu_lock(&glb_policy->mu); + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Incoming server list identical to current, ignoring."); } } else { /* new serverlist */ - if (lb_client->glb_policy->serverlist != NULL) { + if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist); + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } /* and update the copy in the glb_lb_policy instance */ - lb_client->glb_policy->serverlist = serverlist; - } - if (lb_client->glb_policy->rr_policy == NULL) { - /* initial "handover", in this case from a null RR policy, meaning - * it'll just create the first RR policy instance */ - rr_handover_locked(exec_ctx, lb_client->glb_policy, error); - } else { - /* unref the RR policy, eventually leading to its substitution with a - * new one constructed from the received serverlist (see - * glb_rr_connectivity_changed) */ - GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, - "serverlist_received"); + glb_policy->serverlist = serverlist; + + rr_handover_locked(exec_ctx, glb_policy, error); } - gpr_mu_unlock(&lb_client->glb_policy->mu); + gpr_mu_unlock(&glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1170,13 +1135,13 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &lb_client->response_payload; + op->data.recv_message = &glb_policy->response_payload; op->flags = 0; op->reserved = NULL; op++; const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->res_rcvd); /* loop */ + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->res_rcvd); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); return; } @@ -1185,42 +1150,105 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_log(GPR_ERROR, "Invalid LB response received: '%s'", gpr_dump_slice(response_slice, GPR_DUMP_ASCII)); gpr_slice_unref(response_slice); - - /* Disconnect from server returning invalid response. */ - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->close_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); + grpc_call_cancel(glb_policy->lb_call, NULL); + /* srv_status_rcvd_cb will pick up the cancellation and clean up */ } - /* empty payload: call cancelled by server. Cleanups happening in - * srv_status_rcvd_cb */ + /* else, empty payload: call cancelled by server. */ + grpc_metadata_array_destroy(&glb_policy->initial_metadata_recv); } -static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Close from LB client sent. Waiting from server status now"); +static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + gpr_mu_lock(&glb_policy->mu); + + if (!glb_policy->shutting_down) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", + (void *)glb_policy); + } + GPR_ASSERT(glb_policy->lb_call == NULL); + query_for_backends_locked(exec_ctx, glb_policy); } + gpr_mu_unlock(&glb_policy->mu); + + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "grpclb_on_retry_timer"); } static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; + glb_lb_policy *glb_policy = arg; + gpr_mu_lock(&glb_policy->mu); + + GPR_ASSERT(glb_policy->lb_call != NULL); + if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "status from lb server received. Status = %d, Details = '%s', " - "Capacity " - "= %lu", - lb_client->status, lb_client->status_details, - (unsigned long)lb_client->status_details_capacity); + gpr_log(GPR_DEBUG, + "Status from LB server received. Status = %d, Details = '%s', " + "(call: %p)", + glb_policy->lb_call_status, glb_policy->lb_call_status_details, + (void *)glb_policy->lb_call); + } + + if (glb_policy->lb_call_status == GRPC_STATUS_UNIMPLEMENTED) { + char *failing_server = grpc_call_get_peer(glb_policy->lb_call); + char *error_desc; + gpr_asprintf(&error_desc, "Invalid LB server '%s'", failing_server); + gpr_free(failing_server); + /* flush pending ops */ + pending_pick *pp; + while ((pp = glb_policy->pending_picks)) { + glb_policy->pending_picks = pp->next; + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Cancelling pending pick: %s", error_desc); + } + grpc_exec_ctx_sched(exec_ctx, + &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_CREATE(error_desc), NULL); + } + + pending_ping *pping; + while ((pping = glb_policy->pending_pings)) { + glb_policy->pending_pings = pping->next; + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Cancelling pending ping: %s", error_desc); + } + grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_CREATE(error_desc), NULL); + } + gpr_free(error_desc); + } + + const bool was_cancelled = + (glb_policy->lb_call_status == GRPC_STATUS_CANCELLED); + + /* We need to performe cleanups no matter what. */ + lb_client_destroy(glb_policy); + + if (!glb_policy->shutting_down) { + GPR_ASSERT(!was_cancelled); + /* if we aren't shutting down, restart the LB client call after some time */ + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec next_try = + gpr_backoff_step(&glb_policy->lb_call_backoff_state, now); + if (grpc_lb_glb_trace) { + gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", + (void *)glb_policy); + gpr_timespec timeout = gpr_time_sub(next_try, now); + if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { + gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.", + timeout.tv_sec, timeout.tv_nsec); + } else { + gpr_log(GPR_DEBUG, "... retrying immediately."); + } + } + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, + lb_call_on_retry_timer, glb_policy, now); } - /* TODO(dgq): deal with stream termination properly (fire up another one? - * fail the original call?) */ + gpr_mu_unlock(&glb_policy->mu); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "srv_status_rcvd_cb"); } /* Code wiring the policy with the rest of the core */ diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 37a9b18b970..5f530d54fe8 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -120,6 +120,8 @@ typedef struct { grpc_connectivity_state connectivity_state; /** the subchannel's target user data */ void *user_data; + /** vtable to operate over \a user_data */ + grpc_lb_user_data_vtable user_data_vtable; } subchannel_data; struct round_robin_lb_policy { @@ -186,9 +188,13 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel); + gpr_log(GPR_DEBUG, + "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " + "CSC %p)", + (void *)p, (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->ready_list_last_pick->subchannel)); } } @@ -255,9 +261,15 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void*)pol); + } + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + sd->user_data_vtable.destroy(sd->user_data); gpr_free(sd); } @@ -285,6 +297,9 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { size_t i; gpr_mu_lock(&p->mu); + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", pol); + } p->shutdown = 1; while ((pp = p->pending_picks)) { @@ -296,7 +311,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown"); + GRPC_ERROR_CREATE("Channel Shutdown"), "rr_shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, @@ -395,6 +410,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *pp; ready_list *selected; gpr_mu_lock(&p->mu); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_INFO, "Round Robin %p trying to pick", pol); + } + if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -435,7 +455,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - ready_list *selected; int unref = 0; @@ -456,12 +475,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ advance_last_picked_locked(p); } + while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -653,7 +674,9 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - sd->user_data = addresses->addresses[i].user_data; + sd->user_data_vtable = *addresses->user_data_vtable; + sd->user_data = + sd->user_data_vtable.copy(addresses->addresses[i].user_data); ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 57e1a8ec01f..d0ac72a0114 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -347,7 +347,7 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); // Clean up. GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, - "client_channel_factory_create_channel"); + "secure_client_channel_factory_create_channel"); grpc_channel_args_destroy(new_args); grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 0eca46eb525..ebf72a3abb4 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -210,11 +210,11 @@ void grpc_security_connector_unref(grpc_security_connector *sc) { } static void connector_pointer_arg_destroy(void *p) { - GRPC_SECURITY_CONNECTOR_UNREF(p, "connector_pointer_arg"); + GRPC_SECURITY_CONNECTOR_UNREF(p, "connector_pointer_arg_destroy"); } static void *connector_pointer_arg_copy(void *p) { - return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg"); + return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg_copy"); } static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); } From 7ec291330c4442bc2d4adc1f93ff8f7bd9149c14 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 1 Nov 2016 04:09:05 +0100 Subject: [PATCH 2/6] PR comments --- src/core/ext/lb_policy/grpclb/grpclb.c | 253 ++++++++---------- .../ext/lb_policy/round_robin/round_robin.c | 14 +- test/cpp/grpclb/grpclb_test.cc | 16 +- 3 files changed, 133 insertions(+), 150 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index d2af27e7bfc..9a608fd4776 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -47,15 +47,12 @@ * and initiates the internal communication with the LB server. In particular, * it's responsible for instantiating the internal *streaming* call to the LB * server (whichever address from {a1..an} pick-first chose). This call is - * serviced by two callbacks, \a srv_status_rcvd and \a res_rcvd. The former - * will be called when the call to the LB server completes. This can happen if - * the LB server closes the connection or if this policy itself cancels the call - * (for example because it's shutting down). If the call fails with - * UNIMPLEMENTED, the original picks/pings will fail. This signals that there's - * a misconfiguration somewhere: at least one of {a1..an} isn't an LB server, - * which contradicts the LB bit being set. If the internal call times out, the - * usual behavior of pick-first applies, continuing to pick from the list - * {a1..an}. + * serviced by two callbacks, \a lb_on_server_status_received and \a + * lb_on_response_received. The former will be called when the call to the LB + * server completes. This can happen if the LB server closes the connection or + * if this policy itself cancels the call (for example because it's shutting + * down).If the internal call times out, the usual behavior of pick-first + * applies, continuing to pick from the list {a1..an}. * * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a * res_recv. An invalid one results in the termination of the streaming call. A @@ -80,10 +77,10 @@ * Once a RR policy instance is in place (and getting updated as described), * calls to for a pick, a ping or a cancellation will be serviced right away by * forwarding them to the RR instance. Any time there's no RR policy available - * (ie, right after the creation of the gRPCLB policy, if an empty serverlist - * is received, etc), pick/ping requests are added to a list of pending - * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the - * moment the RR policy instance becomes available. + * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is + * received, etc), pick/ping requests are added to a list of pending picks/pings + * to be flushed and serviced as part of \a rr_handover_locked() the moment the + * RR policy instance becomes available. * * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * high level design and details. */ @@ -311,32 +308,28 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ - /* called once initial metadata's been sent */ - grpc_closure md_sent; + /* Status from the LB server has been received. This signals the end of the LB + * call. */ + grpc_closure lb_on_server_status_received; - /* called once the LoadBalanceRequest has been sent to the LB server. See - * src/proto/grpc/.../load_balancer.proto */ - grpc_closure req_sent; - - /* A response from the LB server has been received (or error). Process it */ - grpc_closure res_rcvd; - - /* ... and the status from the LB server has been received */ - grpc_closure srv_status_rcvd; + /* A response from the LB server has been received. Process it */ + grpc_closure lb_on_response_received; grpc_call *lb_call; /* streaming call to the LB server, */ - grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ - grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ + grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ + grpc_metadata_array + lb_trailing_metadata_recv; /* trailing MD from LB server */ /* what's being sent to the LB server. Note that its value may vary if the LB * server indicates a redirect. */ - grpc_byte_buffer *request_payload; + grpc_byte_buffer *lb_request_payload; - /* response from the LB server, if any. Processed in res_recv_cb() */ - grpc_byte_buffer *response_payload; + /* response from the LB server, if any. Processed in lb_on_response_received() + */ + grpc_byte_buffer *lb_response_payload; - /* the call's status and status detailset in srv_status_rcvd_cb() */ + /* the call's status and status detailset in lb_on_server_status_received() */ grpc_status_code lb_call_status; char *lb_call_status_details; size_t lb_call_status_details_capacity; @@ -346,7 +339,6 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; - } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -395,6 +387,28 @@ static int lb_token_cmp(void *token1, void *token2) { static const grpc_lb_user_data_vtable lb_token_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; +static void parse_server(const grpc_grpclb_server *server, + grpc_resolved_address *addr) { + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + memset(addr, 0, sizeof(*addr)); + if (ip->size == 4) { + addr->len = sizeof(struct sockaddr_in); + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + addr->len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr; + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } +} + /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses *process_serverlist( const grpc_grpclb_serverlist *serverlist) { @@ -421,25 +435,8 @@ static grpc_lb_addresses *process_serverlist( if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; /* address processing */ - const uint16_t netorder_port = htons((uint16_t)server->port); - /* the addresses are given in binary format (a in(6)_addr struct) in - * server->ip_address.bytes. */ - const grpc_grpclb_ip_address *ip = &server->ip_address; grpc_resolved_address addr; - memset(&addr, 0, sizeof(addr)); - if (ip->size == 4) { - addr.len = sizeof(struct sockaddr_in); - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; - addr4->sin_family = AF_INET; - memcpy(&addr4->sin_addr, ip->bytes, ip->size); - addr4->sin_port = netorder_port; - } else if (ip->size == 16) { - addr.len = sizeof(struct sockaddr_in6); - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; - addr6->sin6_family = AF_INET; - memcpy(&addr6->sin6_addr, ip->bytes, ip->size); - addr6->sin6_port = netorder_port; - } + parse_server(server, &addr); /* lb token processing */ void *user_data; @@ -538,7 +535,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, } if (glb_policy->rr_policy != NULL) { /* if we are phasing out an existing RR instance, unref it. */ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover_locked"); + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); } glb_policy->rr_policy = @@ -565,6 +562,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, rr_connectivity->state, GRPC_ERROR_REF(error), "rr_handover"); /* subscribe */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectiviby_cb"); grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -606,7 +604,8 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN) { + if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && + !glb_policy->shutting_down) { gpr_mu_lock(&glb_policy->mu); /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, @@ -618,6 +617,8 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, &rr_conn_data->on_change); gpr_mu_unlock(&glb_policy->mu); } else { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "rr_connectiviby_cb"); gpr_free(rr_conn_data); } } @@ -778,7 +779,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->started_picking) { if (glb_policy->lb_call != NULL) { grpc_call_cancel(glb_policy->lb_call, NULL); - /* srv_status_rcvd_cb will pick up the cancellation and clean up */ + /* lb_on_server_status_received will pick up the cancellation and clean up + */ } } @@ -950,10 +952,11 @@ static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&glb_policy->mu); } -static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void lb_client_init(glb_lb_policy *glb_policy) { +static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void lb_call_init(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); @@ -966,13 +969,13 @@ static void lb_client_init(glb_lb_policy *glb_policy) { "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, glb_policy->deadline, NULL); - grpc_metadata_array_init(&glb_policy->initial_metadata_recv); - grpc_metadata_array_init(&glb_policy->trailing_metadata_recv); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); + grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); grpc_grpclb_request *request = grpc_grpclb_request_create(glb_policy->server_name); gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); - glb_policy->request_payload = + glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); gpr_slice_unref(request_payload_slice); grpc_grpclb_request_destroy(request); @@ -980,24 +983,25 @@ static void lb_client_init(glb_lb_policy *glb_policy) { glb_policy->lb_call_status_details = NULL; glb_policy->lb_call_status_details_capacity = 0; - grpc_closure_init(&glb_policy->srv_status_rcvd, srv_status_rcvd_cb, - glb_policy); - grpc_closure_init(&glb_policy->res_rcvd, res_recv_cb, glb_policy); + grpc_closure_init(&glb_policy->lb_on_server_status_received, + lb_on_server_status_received, glb_policy); + grpc_closure_init(&glb_policy->lb_on_response_received, + lb_on_response_received, glb_policy); gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); } -static void lb_client_destroy(glb_lb_policy *glb_policy) { +static void lb_call_destroy(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_call != NULL); grpc_call_destroy(glb_policy->lb_call); glb_policy->lb_call = NULL; - grpc_metadata_array_destroy(&glb_policy->initial_metadata_recv); - grpc_metadata_array_destroy(&glb_policy->trailing_metadata_recv); + grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv); + grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); - grpc_byte_buffer_destroy(glb_policy->request_payload); + grpc_byte_buffer_destroy(glb_policy->lb_request_payload); gpr_free(glb_policy->lb_call_status_details); } @@ -1007,8 +1011,10 @@ static void lb_client_destroy(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_server_status_received */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends_locked"); - lb_client_init(glb_policy); + lb_call_init(glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", @@ -1028,21 +1034,21 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &glb_policy->initial_metadata_recv; + op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - GPR_ASSERT(glb_policy->request_payload != NULL); + GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = glb_policy->request_payload; + op->data.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = - &glb_policy->trailing_metadata_recv; + &glb_policy->lb_trailing_metadata_recv; op->data.recv_status_on_client.status = &glb_policy->lb_call_status; op->data.recv_status_on_client.status_details = &glb_policy->lb_call_status_details; @@ -1051,37 +1057,38 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call, - ops, (size_t)(op - ops), - &glb_policy->srv_status_rcvd); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_server_status_received); GPR_ASSERT(GRPC_CALL_OK == call_error); op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &glb_policy->response_payload; + op->data.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call, - ops, (size_t)(op - ops), - &glb_policy->res_rcvd); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { glb_lb_policy *glb_policy = arg; grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - if (glb_policy->response_payload != NULL) { + if (glb_policy->lb_response_payload != NULL) { gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside - * glb_policy->response_payload, for a serverlist. */ + * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, glb_policy->response_payload); + grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_destroy(glb_policy->response_payload); + grpc_byte_buffer_destroy(glb_policy->lb_response_payload); grpc_grpclb_serverlist *serverlist = grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != NULL) { @@ -1090,15 +1097,9 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Serverlist with %lu servers received", (unsigned long)serverlist->num_servers); - /* TODO(dgq): this needs to work with ipv6. */ for (size_t i = 0; i < serverlist->num_servers; ++i) { grpc_resolved_address addr; - struct sockaddr_in *sa = (struct sockaddr_in *)&addr.addr; - addr.len = sizeof(struct sockaddr_in); - sa->sin_family = AF_INET; - sa->sin_port = htons((uint16_t)serverlist->servers[i]->port); - memcpy(&sa->sin_addr, serverlist->servers[i]->ip_address.bytes, - serverlist->servers[i]->ip_address.size); + parse_server(serverlist->servers[i], &addr); char *ipport; grpc_sockaddr_to_string(&ipport, &addr, false); gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); @@ -1132,29 +1133,25 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { "response with > 0 servers is received"); } } - - /* keep listening for serverlist updates */ - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &glb_policy->response_payload; - op->flags = 0; - op->reserved = NULL; - op++; - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->res_rcvd); /* loop */ - GPR_ASSERT(GRPC_CALL_OK == call_error); - return; + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + gpr_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + gpr_slice_unref(response_slice); } - GPR_ASSERT(serverlist == NULL); - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'", - gpr_dump_slice(response_slice, GPR_DUMP_ASCII)); - gpr_slice_unref(response_slice); - grpc_call_cancel(glb_policy->lb_call, NULL); - /* srv_status_rcvd_cb will pick up the cancellation and clean up */ + /* keep listening for serverlist updates */ + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &glb_policy->lb_response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); /* loop */ + GPR_ASSERT(GRPC_CALL_OK == call_error); + return; } /* else, empty payload: call cancelled by server. */ - grpc_metadata_array_destroy(&glb_policy->initial_metadata_recv); } static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, @@ -1176,8 +1173,8 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, "grpclb_on_retry_timer"); } -static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { glb_lb_policy *glb_policy = arg; gpr_mu_lock(&glb_policy->mu); @@ -1191,40 +1188,11 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, (void *)glb_policy->lb_call); } - if (glb_policy->lb_call_status == GRPC_STATUS_UNIMPLEMENTED) { - char *failing_server = grpc_call_get_peer(glb_policy->lb_call); - char *error_desc; - gpr_asprintf(&error_desc, "Invalid LB server '%s'", failing_server); - gpr_free(failing_server); - /* flush pending ops */ - pending_pick *pp; - while ((pp = glb_policy->pending_picks)) { - glb_policy->pending_picks = pp->next; - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Cancelling pending pick: %s", error_desc); - } - grpc_exec_ctx_sched(exec_ctx, - &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_CREATE(error_desc), NULL); - } - - pending_ping *pping; - while ((pping = glb_policy->pending_pings)) { - glb_policy->pending_pings = pping->next; - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Cancelling pending ping: %s", error_desc); - } - grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_CREATE(error_desc), NULL); - } - gpr_free(error_desc); - } - const bool was_cancelled = (glb_policy->lb_call_status == GRPC_STATUS_CANCELLED); /* We need to performe cleanups no matter what. */ - lb_client_destroy(glb_policy); + lb_call_destroy(glb_policy); if (!glb_policy->shutting_down) { GPR_ASSERT(!was_cancelled); @@ -1248,7 +1216,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, lb_call_on_retry_timer, glb_policy, now); } gpr_mu_unlock(&glb_policy->mu); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "srv_status_rcvd_cb"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_server_status_received"); } /* Code wiring the policy with the rest of the core */ diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 5f530d54fe8..36112874502 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -121,7 +121,7 @@ typedef struct { /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ - grpc_lb_user_data_vtable user_data_vtable; + const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; struct round_robin_lb_policy { @@ -269,7 +269,9 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); - sd->user_data_vtable.destroy(sd->user_data); + if (sd->user_data_vtable != NULL) { + sd->user_data_vtable->destroy(sd->user_data); + } gpr_free(sd); } @@ -298,7 +300,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_lock(&p->mu); if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", pol); + gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } p->shutdown = 1; @@ -412,7 +414,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_lock(&p->mu); if (grpc_lb_round_robin_trace) { - gpr_log(GPR_INFO, "Round Robin %p trying to pick", pol); + gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } if ((selected = peek_next_connected_locked(p))) { @@ -674,9 +676,9 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - sd->user_data_vtable = *addresses->user_data_vtable; + sd->user_data_vtable = addresses->user_data_vtable; sd->user_data = - sd->user_data_vtable.copy(addresses->addresses[i].user_data); + sd->user_data_vtable->copy(addresses->addresses[i].user_data); ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index b6056f9ae4c..3a92eb56b7f 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -76,10 +76,22 @@ extern "C" { // - Send a serverlist with faulty ip:port addresses (port > 2^16, etc). // - Test reception of invalid serverlist // - Test pinging -// - Test against a non-LB server. That server should return UNIMPLEMENTED and -// the call should fail. +// - Test against a non-LB server. // - Random LB server closing the stream unexpectedly. // - Test using DNS-resolvable names (localhost?) +// +// Findings from end to end testing to be covered here: +// - Handling of LB servers restart, including reconnection after backing-off +// retries. +// - Destruction of load balanced channel (and therefore of grpclb instance) +// while: +// 1) the internal LB call is still active. This should work by virtue +// of the weak reference the LB call holds. The call should be terminated as +// part of the grpclb shutdown process. +// 2) the retry timer is active. Again, the weak reference it holds should +// prevent a premature call to \a glb_destroy. +// - Restart of backend servers with no changes to serverlist. This exercises +// the RR handover mechanism. namespace grpc { namespace { From 246c564bb89ec9f52e9ddbdd8a2df4e6df259e08 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 1 Nov 2016 11:16:52 -0700 Subject: [PATCH 3/6] PR comments, take two --- src/core/ext/lb_policy/grpclb/grpclb.c | 37 +++++++++---------- .../ext/lb_policy/round_robin/round_robin.c | 3 +- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 9a608fd4776..bfcb9e6418c 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -51,7 +51,7 @@ * lb_on_response_received. The former will be called when the call to the LB * server completes. This can happen if the LB server closes the connection or * if this policy itself cancels the call (for example because it's shutting - * down).If the internal call times out, the usual behavior of pick-first + * down). If the internal call times out, the usual behavior of pick-first * applies, continuing to pick from the list {a1..an}. * * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a @@ -325,11 +325,10 @@ typedef struct glb_lb_policy { * server indicates a redirect. */ grpc_byte_buffer *lb_request_payload; - /* response from the LB server, if any. Processed in lb_on_response_received() - */ + /* response the LB server, if any. Processed in lb_on_response_received() */ grpc_byte_buffer *lb_response_payload; - /* the call's status and status detailset in lb_on_server_status_received() */ + /* call status code and details, set in lb_on_server_status_received() */ grpc_status_code lb_call_status; char *lb_call_status_details; size_t lb_call_status_details_capacity; @@ -1013,7 +1012,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(glb_policy->lb_channel != NULL); /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref * count goes to zero) to be unref'd in lb_on_server_status_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends_locked"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends"); lb_call_init(glb_policy); if (grpc_lb_glb_trace) { @@ -1139,19 +1138,21 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_unref(response_slice); } - /* keep listening for serverlist updates */ - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &glb_policy->lb_response_payload; - op->flags = 0; - op->reserved = NULL; - op++; - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_response_received); /* loop */ - GPR_ASSERT(GRPC_CALL_OK == call_error); + if (!glb_policy->shutting_down) { + /* keep listening for serverlist updates */ + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &glb_policy->lb_response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); /* loop */ + GPR_ASSERT(GRPC_CALL_OK == call_error); + } return; } - /* else, empty payload: call cancelled by server. */ + /* else, empty payload: call cancelled. */ } static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, @@ -1188,14 +1189,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, (void *)glb_policy->lb_call); } - const bool was_cancelled = - (glb_policy->lb_call_status == GRPC_STATUS_CANCELLED); - /* We need to performe cleanups no matter what. */ lb_call_destroy(glb_policy); if (!glb_policy->shutting_down) { - GPR_ASSERT(!was_cancelled); /* if we aren't shutting down, restart the LB client call after some time */ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 36112874502..9b20edec92c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -269,7 +269,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); - if (sd->user_data_vtable != NULL) { + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable); sd->user_data_vtable->destroy(sd->user_data); } gpr_free(sd); From e224a76a6e7523012d8be899240c4cee1ba6f2b4 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 1 Nov 2016 13:00:58 -0700 Subject: [PATCH 4/6] PR comments, take three --- src/core/ext/client_channel/lb_policy.h | 6 +++++ src/core/ext/lb_policy/grpclb/grpclb.c | 25 ++++++++++++------- .../ext/lb_policy/round_robin/round_robin.c | 2 +- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h index 54ad7797920..120c641edc6 100644 --- a/src/core/ext/client_channel/lb_policy.h +++ b/src/core/ext/client_channel/lb_policy.h @@ -109,10 +109,16 @@ struct grpc_lb_policy_vtable { /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG + +/* Strong references: the policy will shutdown when they reach zero */ #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) + +/* Weak references: they don't prevent the shutdown of the LB policy. When no + * strong references are left but there are still weak ones, shutdown is called. + * Once the weak reference also reaches zero, the LB policy is destroyed. */ #define GRPC_LB_POLICY_WEAK_REF(p, r) \ grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index bfcb9e6418c..451fbd65de3 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -561,7 +561,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, rr_connectivity->state, GRPC_ERROR_REF(error), "rr_handover"); /* subscribe */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectiviby_cb"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -609,15 +609,15 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), - "glb_rr_connectivity_changed"); - /* resubscribe */ + "rr_connectivity_cb"); + /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); gpr_mu_unlock(&glb_policy->mu); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "rr_connectiviby_cb"); + "rr_connectivity_cb"); gpr_free(rr_conn_data); } } @@ -1010,9 +1010,6 @@ static void lb_call_destroy(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); - /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_server_status_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends"); lb_call_init(glb_policy); if (grpc_lb_glb_trace) { @@ -1056,6 +1053,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_server_status_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1067,6 +1067,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; + /* take another weak ref to be unref'd in lb_on_response_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); @@ -1145,14 +1147,19 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, op->flags = 0; op->reserved = NULL; op++; + /* reuse the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } - return; + } else { /* empty payload: call cancelled. */ + /* dispose of the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() and reused in every reception loop */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_empty_payload"); } - /* else, empty payload: call cancelled. */ } static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 9b20edec92c..4700267ba2a 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -270,7 +270,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable); + GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(sd->user_data); } gpr_free(sd); From 2ff8180b33dfa0fc4191d2608cae4e08860543df Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 1 Nov 2016 21:58:16 -0700 Subject: [PATCH 5/6] clang-format --- src/core/ext/lb_policy/round_robin/round_robin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 4700267ba2a..b0c461730b3 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -263,7 +263,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { ready_list *elem; if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void*)pol); + gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } for (size_t i = 0; i < p->num_subchannels; i++) { From 6e7baf75dd8ab88d452e7001c41699857afd6e98 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 1 Nov 2016 23:08:36 -0700 Subject: [PATCH 6/6] generate projects --- tools/run_tests/tests.json | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 733bfe3b8fd..39daf859b46 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -3003,6 +3003,7 @@ ], "cpu_cost": 1.0, "exclude_configs": [], + "exclude_iomgrs": [], "flaky": false, "gtest": false, "language": "c++",