|
|
|
@ -123,6 +123,7 @@ |
|
|
|
|
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
|
|
|
|
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 |
|
|
|
|
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2 |
|
|
|
|
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 |
|
|
|
|
|
|
|
|
|
grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); |
|
|
|
|
|
|
|
|
@ -299,6 +300,10 @@ typedef struct glb_lb_policy { |
|
|
|
|
/** timeout in milliseconds for the LB call. 0 means no deadline. */ |
|
|
|
|
int lb_call_timeout_ms; |
|
|
|
|
|
|
|
|
|
/** timeout in milliseconds for before using fallback backend addresses.
|
|
|
|
|
* 0 means not using fallback. */ |
|
|
|
|
int lb_fallback_timeout_ms; |
|
|
|
|
|
|
|
|
|
/** for communicating with the LB server */ |
|
|
|
|
grpc_channel *lb_channel; |
|
|
|
|
|
|
|
|
@ -325,6 +330,9 @@ typedef struct glb_lb_policy { |
|
|
|
|
* Otherwise, we delegate to the RR policy. */ |
|
|
|
|
size_t serverlist_index; |
|
|
|
|
|
|
|
|
|
/** stores the backend addresses from the resolver */ |
|
|
|
|
grpc_lb_addresses *fallback_backend_addresses; |
|
|
|
|
|
|
|
|
|
/** list of picks that are waiting on RR's policy connectivity */ |
|
|
|
|
pending_pick *pending_picks; |
|
|
|
|
|
|
|
|
@ -345,6 +353,9 @@ typedef struct glb_lb_policy { |
|
|
|
|
/** is \a lb_call_retry_timer active? */ |
|
|
|
|
bool retry_timer_active; |
|
|
|
|
|
|
|
|
|
/** is \a lb_fallback_timer active? */ |
|
|
|
|
bool fallback_timer_active; |
|
|
|
|
|
|
|
|
|
/** called upon changes to the LB channel's connectivity. */ |
|
|
|
|
grpc_closure lb_channel_on_connectivity_changed; |
|
|
|
|
|
|
|
|
@ -364,6 +375,9 @@ typedef struct glb_lb_policy { |
|
|
|
|
/* LB call retry timer callback. */ |
|
|
|
|
grpc_closure lb_on_call_retry; |
|
|
|
|
|
|
|
|
|
/* LB fallback timer callback. */ |
|
|
|
|
grpc_closure lb_on_fallback; |
|
|
|
|
|
|
|
|
|
grpc_call *lb_call; /* streaming call to the LB server, */ |
|
|
|
|
|
|
|
|
|
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ |
|
|
|
@ -387,6 +401,9 @@ typedef struct glb_lb_policy { |
|
|
|
|
/** LB call retry timer */ |
|
|
|
|
grpc_timer lb_call_retry_timer; |
|
|
|
|
|
|
|
|
|
/** LB fallback timer */ |
|
|
|
|
grpc_timer lb_fallback_timer; |
|
|
|
|
|
|
|
|
|
bool seen_initial_response; |
|
|
|
|
|
|
|
|
|
/* Stats for client-side load reporting. Should be unreffed and
|
|
|
|
@ -532,6 +549,32 @@ static grpc_lb_addresses *process_serverlist_locked( |
|
|
|
|
return lb_addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Returns the backend addresses extracted from the given addresses */ |
|
|
|
|
static grpc_lb_addresses *extract_backend_addresses_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) { |
|
|
|
|
/* first pass: count the number of backend addresses */ |
|
|
|
|
size_t num_backends = 0; |
|
|
|
|
for (size_t i = 0; i < addresses->num_addresses; ++i) { |
|
|
|
|
if (!addresses->addresses[i].is_balancer) { |
|
|
|
|
++num_backends; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* second pass: actually populate the addresses and (empty) LB tokens */ |
|
|
|
|
grpc_lb_addresses *backend_addresses = |
|
|
|
|
grpc_lb_addresses_create(num_backends, &lb_token_vtable); |
|
|
|
|
size_t num_copied = 0; |
|
|
|
|
for (size_t i = 0; i < addresses->num_addresses; ++i) { |
|
|
|
|
if (addresses->addresses[i].is_balancer) continue; |
|
|
|
|
const grpc_resolved_address *addr = &addresses->addresses[i].address; |
|
|
|
|
grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, |
|
|
|
|
addr->len, false /* is_balancer */, |
|
|
|
|
NULL /* balancer_name */, |
|
|
|
|
(void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); |
|
|
|
|
++num_copied; |
|
|
|
|
} |
|
|
|
|
return backend_addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void update_lb_connectivity_status_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
grpc_connectivity_state rr_state, grpc_error *rr_state_error) { |
|
|
|
@ -599,35 +642,38 @@ static bool pick_from_internal_rr_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
const grpc_lb_policy_pick_args *pick_args, bool force_async, |
|
|
|
|
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { |
|
|
|
|
// Look at the index into the serverlist to see if we should drop this call.
|
|
|
|
|
grpc_grpclb_server *server = |
|
|
|
|
glb_policy->serverlist->servers[glb_policy->serverlist_index++]; |
|
|
|
|
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { |
|
|
|
|
glb_policy->serverlist_index = 0; // Wrap-around.
|
|
|
|
|
} |
|
|
|
|
if (server->drop) { |
|
|
|
|
// Not using the RR policy, so unref it.
|
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", |
|
|
|
|
(intptr_t)wc_arg->rr_policy); |
|
|
|
|
// Check for drops if we are not using fallback backend addresses.
|
|
|
|
|
if (glb_policy->serverlist != NULL) { |
|
|
|
|
// Look at the index into the serverlist to see if we should drop this call.
|
|
|
|
|
grpc_grpclb_server *server = |
|
|
|
|
glb_policy->serverlist->servers[glb_policy->serverlist_index++]; |
|
|
|
|
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { |
|
|
|
|
glb_policy->serverlist_index = 0; // Wrap-around.
|
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); |
|
|
|
|
// Update client load reporting stats to indicate the number of
|
|
|
|
|
// dropped calls. Note that we have to do this here instead of in
|
|
|
|
|
// the client_load_reporting filter, because we do not create a
|
|
|
|
|
// subchannel call (and therefore no client_load_reporting filter)
|
|
|
|
|
// for dropped calls.
|
|
|
|
|
grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, |
|
|
|
|
wc_arg->client_stats); |
|
|
|
|
grpc_grpclb_client_stats_unref(wc_arg->client_stats); |
|
|
|
|
if (force_async) { |
|
|
|
|
GPR_ASSERT(wc_arg->wrapped_closure != NULL); |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); |
|
|
|
|
if (server->drop) { |
|
|
|
|
// Not using the RR policy, so unref it.
|
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", |
|
|
|
|
(intptr_t)wc_arg->rr_policy); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); |
|
|
|
|
// Update client load reporting stats to indicate the number of
|
|
|
|
|
// dropped calls. Note that we have to do this here instead of in
|
|
|
|
|
// the client_load_reporting filter, because we do not create a
|
|
|
|
|
// subchannel call (and therefore no client_load_reporting filter)
|
|
|
|
|
// for dropped calls.
|
|
|
|
|
grpc_grpclb_client_stats_add_call_dropped_locked( |
|
|
|
|
server->load_balance_token, wc_arg->client_stats); |
|
|
|
|
grpc_grpclb_client_stats_unref(wc_arg->client_stats); |
|
|
|
|
if (force_async) { |
|
|
|
|
GPR_ASSERT(wc_arg->wrapped_closure != NULL); |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); |
|
|
|
|
gpr_free(wc_arg->free_when_done); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
gpr_free(wc_arg->free_when_done); |
|
|
|
|
return false; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
gpr_free(wc_arg->free_when_done); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
// Pick via the RR policy.
|
|
|
|
|
const bool pick_done = grpc_lb_policy_pick_locked( |
|
|
|
@ -665,8 +711,18 @@ static bool pick_from_internal_rr_locked( |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
grpc_lb_addresses *addresses = |
|
|
|
|
process_serverlist_locked(exec_ctx, glb_policy->serverlist); |
|
|
|
|
grpc_lb_addresses *addresses; |
|
|
|
|
if (glb_policy->serverlist != NULL) { |
|
|
|
|
GPR_ASSERT(glb_policy->serverlist->num_servers > 0); |
|
|
|
|
addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); |
|
|
|
|
} else { |
|
|
|
|
// If rr_handover_locked() is invoked when we haven't received any
|
|
|
|
|
// serverlist from the balancer, we use the fallback backends returned by
|
|
|
|
|
// the resolver. Note that the fallback backend list may be empty, in which
|
|
|
|
|
// case the new round_robin policy will keep the requested picks pending.
|
|
|
|
|
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); |
|
|
|
|
addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(addresses != NULL); |
|
|
|
|
grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args)); |
|
|
|
|
args->client_channel_factory = glb_policy->cc_factory; |
|
|
|
@ -772,8 +828,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
/* glb_policy->rr_policy may be NULL (initial handover) */ |
|
|
|
|
static void rr_handover_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
GPR_ASSERT(glb_policy->serverlist != NULL && |
|
|
|
|
glb_policy->serverlist->num_servers > 0); |
|
|
|
|
if (glb_policy->shutting_down) return; |
|
|
|
|
grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); |
|
|
|
|
GPR_ASSERT(args != NULL); |
|
|
|
@ -922,6 +976,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
if (glb_policy->serverlist != NULL) { |
|
|
|
|
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); |
|
|
|
|
} |
|
|
|
|
if (glb_policy->fallback_backend_addresses != NULL) { |
|
|
|
|
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); |
|
|
|
|
} |
|
|
|
|
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); |
|
|
|
|
grpc_subchannel_index_unref(); |
|
|
|
|
if (glb_policy->pending_update_args != NULL) { |
|
|
|
@ -1063,10 +1120,28 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error); |
|
|
|
|
static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy); |
|
|
|
|
static void start_picking_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
/* start a timer to fall back */ |
|
|
|
|
if (glb_policy->lb_fallback_timeout_ms > 0 && |
|
|
|
|
glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) { |
|
|
|
|
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); |
|
|
|
|
gpr_timespec deadline = gpr_time_add( |
|
|
|
|
now, |
|
|
|
|
gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN)); |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); |
|
|
|
|
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, |
|
|
|
|
glb_policy, |
|
|
|
|
grpc_combiner_scheduler(glb_policy->base.combiner)); |
|
|
|
|
glb_policy->fallback_timer_active = true; |
|
|
|
|
grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, |
|
|
|
|
&glb_policy->lb_on_fallback, now); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
glb_policy->started_picking = true; |
|
|
|
|
gpr_backoff_reset(&glb_policy->lb_call_backoff_state); |
|
|
|
|
query_for_backends_locked(exec_ctx, glb_policy); |
|
|
|
@ -1545,6 +1620,15 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
if (glb_policy->serverlist != NULL) { |
|
|
|
|
/* dispose of the old serverlist */ |
|
|
|
|
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); |
|
|
|
|
} else { |
|
|
|
|
/* or dispose of the fallback */ |
|
|
|
|
grpc_lb_addresses_destroy(exec_ctx, |
|
|
|
|
glb_policy->fallback_backend_addresses); |
|
|
|
|
glb_policy->fallback_backend_addresses = NULL; |
|
|
|
|
if (glb_policy->fallback_timer_active) { |
|
|
|
|
grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); |
|
|
|
|
glb_policy->fallback_timer_active = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* and update the copy in the glb_lb_policy instance. This
|
|
|
|
|
* serverlist instance will be destroyed either upon the next |
|
|
|
@ -1555,9 +1639,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"Received empty server list. Picks will stay pending until " |
|
|
|
|
"a response with > 0 servers is received"); |
|
|
|
|
gpr_log(GPR_INFO, "Received empty server list, ignoring."); |
|
|
|
|
} |
|
|
|
|
grpc_grpclb_destroy_serverlist(serverlist); |
|
|
|
|
} |
|
|
|
@ -1592,6 +1674,27 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
glb_lb_policy *glb_policy = (glb_lb_policy *)arg; |
|
|
|
|
glb_policy->fallback_timer_active = false; |
|
|
|
|
/* If we receive a serverlist after the timer fires but before this callback
|
|
|
|
|
* actually runs, don't fall back. */ |
|
|
|
|
if (glb_policy->serverlist == NULL) { |
|
|
|
|
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"Falling back to use backends from resolver (grpclb %p)", |
|
|
|
|
(void *)glb_policy); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); |
|
|
|
|
rr_handover_locked(exec_ctx, glb_policy); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, |
|
|
|
|
"grpclb_fallback_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
glb_lb_policy *glb_policy = (glb_lb_policy *)arg; |
|
|
|
@ -1616,31 +1719,22 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fallback_update_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy, |
|
|
|
|
const grpc_lb_addresses *addresses) { |
|
|
|
|
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); |
|
|
|
|
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); |
|
|
|
|
glb_policy->fallback_backend_addresses = |
|
|
|
|
extract_backend_addresses_locked(exec_ctx, addresses); |
|
|
|
|
if (glb_policy->lb_fallback_timeout_ms > 0 && |
|
|
|
|
!glb_policy->fallback_timer_active) { |
|
|
|
|
rr_handover_locked(exec_ctx, glb_policy); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, |
|
|
|
|
const grpc_lb_policy_args *args) { |
|
|
|
|
glb_lb_policy *glb_policy = (glb_lb_policy *)policy; |
|
|
|
|
if (glb_policy->updating_lb_channel) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"Update already in progress for grpclb %p. Deferring update.", |
|
|
|
|
(void *)glb_policy); |
|
|
|
|
} |
|
|
|
|
if (glb_policy->pending_update_args != NULL) { |
|
|
|
|
grpc_channel_args_destroy(exec_ctx, |
|
|
|
|
glb_policy->pending_update_args->args); |
|
|
|
|
gpr_free(glb_policy->pending_update_args); |
|
|
|
|
} |
|
|
|
|
glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc( |
|
|
|
|
sizeof(*glb_policy->pending_update_args)); |
|
|
|
|
glb_policy->pending_update_args->client_channel_factory = |
|
|
|
|
args->client_channel_factory; |
|
|
|
|
glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); |
|
|
|
|
glb_policy->pending_update_args->combiner = args->combiner; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
glb_policy->updating_lb_channel = true; |
|
|
|
|
// Propagate update to lb_channel (pick first).
|
|
|
|
|
const grpc_arg *arg = |
|
|
|
|
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); |
|
|
|
|
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { |
|
|
|
@ -1658,13 +1752,43 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, |
|
|
|
|
"ignoring.", |
|
|
|
|
(void *)glb_policy); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
const grpc_lb_addresses *addresses = |
|
|
|
|
(const grpc_lb_addresses *)arg->value.pointer.p; |
|
|
|
|
|
|
|
|
|
if (glb_policy->serverlist == NULL) { |
|
|
|
|
// If a non-empty serverlist hasn't been received from the balancer,
|
|
|
|
|
// propagate the update to fallback_backend_addresses.
|
|
|
|
|
fallback_update_locked(exec_ctx, glb_policy, addresses); |
|
|
|
|
} else if (glb_policy->updating_lb_channel) { |
|
|
|
|
// If we have recieved serverlist from the balancer, we need to defer update
|
|
|
|
|
// when there is an in-progress one.
|
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"Update already in progress for grpclb %p. Deferring update.", |
|
|
|
|
(void *)glb_policy); |
|
|
|
|
} |
|
|
|
|
if (glb_policy->pending_update_args != NULL) { |
|
|
|
|
grpc_channel_args_destroy(exec_ctx, |
|
|
|
|
glb_policy->pending_update_args->args); |
|
|
|
|
gpr_free(glb_policy->pending_update_args); |
|
|
|
|
} |
|
|
|
|
glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc( |
|
|
|
|
sizeof(*glb_policy->pending_update_args)); |
|
|
|
|
glb_policy->pending_update_args->client_channel_factory = |
|
|
|
|
args->client_channel_factory; |
|
|
|
|
glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); |
|
|
|
|
glb_policy->pending_update_args->combiner = args->combiner; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
glb_policy->updating_lb_channel = true; |
|
|
|
|
GPR_ASSERT(glb_policy->lb_channel != NULL); |
|
|
|
|
grpc_channel_args *lb_channel_args = build_lb_channel_args( |
|
|
|
|
exec_ctx, addresses, glb_policy->response_generator, args->args); |
|
|
|
|
/* Propagate updates to the LB channel through the fake resolver */ |
|
|
|
|
/* Propagate updates to the LB channel (pick first) through the fake resolver
|
|
|
|
|
*/ |
|
|
|
|
grpc_fake_resolver_response_generator_set_response( |
|
|
|
|
exec_ctx, glb_policy->response_generator, lb_channel_args); |
|
|
|
|
grpc_channel_args_destroy(exec_ctx, lb_channel_args); |
|
|
|
@ -1767,13 +1891,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { |
|
|
|
|
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_lb_policy_factory *factory, |
|
|
|
|
grpc_lb_policy_args *args) { |
|
|
|
|
/* Count the number of gRPC-LB addresses. There must be at least one.
|
|
|
|
|
* TODO(roth): For now, we ignore non-balancer addresses, but in the |
|
|
|
|
* future, we may change the behavior such that we fall back to using |
|
|
|
|
* the non-balancer addresses if we cannot reach any balancers. In the |
|
|
|
|
* fallback case, we should use the LB policy indicated by |
|
|
|
|
* GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is |
|
|
|
|
* unset, we should default to pick_first). */ |
|
|
|
|
/* Count the number of gRPC-LB addresses. There must be at least one. */ |
|
|
|
|
const grpc_arg *arg = |
|
|
|
|
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); |
|
|
|
|
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { |
|
|
|
@ -1809,6 +1927,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_policy->lb_call_timeout_ms = |
|
|
|
|
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); |
|
|
|
|
|
|
|
|
|
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); |
|
|
|
|
glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer( |
|
|
|
|
arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, |
|
|
|
|
INT_MAX}); |
|
|
|
|
|
|
|
|
|
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
|
|
|
|
|
// since we use this to trigger the client_load_reporting filter.
|
|
|
|
|
grpc_arg new_arg = grpc_channel_arg_string_create( |
|
|
|
@ -1817,6 +1940,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_policy->args = grpc_channel_args_copy_and_add_and_remove( |
|
|
|
|
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); |
|
|
|
|
|
|
|
|
|
/* Extract the backend addresses (may be empty) from the resolver for
|
|
|
|
|
* fallback. */ |
|
|
|
|
glb_policy->fallback_backend_addresses = |
|
|
|
|
extract_backend_addresses_locked(exec_ctx, addresses); |
|
|
|
|
|
|
|
|
|
/* Create a client channel over them to communicate with a LB service */ |
|
|
|
|
glb_policy->response_generator = |
|
|
|
|
grpc_fake_resolver_response_generator_create(); |
|
|
|
|