|
|
|
@ -491,11 +491,8 @@ static grpc_lb_addresses *process_serverlist_locked( |
|
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
|
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; |
|
|
|
|
} |
|
|
|
|
if (num_valid == 0) return NULL; |
|
|
|
|
|
|
|
|
|
grpc_lb_addresses *lb_addresses = |
|
|
|
|
grpc_lb_addresses_create(num_valid, &lb_token_vtable); |
|
|
|
|
|
|
|
|
|
/* second pass: actually populate the addresses and LB tokens (aka user data
|
|
|
|
|
* to the outside world) to be read by the RR policy during its creation. |
|
|
|
|
* Given that the validity tests are very cheap, they are performed again |
|
|
|
@ -503,14 +500,12 @@ static grpc_lb_addresses *process_serverlist_locked( |
|
|
|
|
* incurr in an allocation due to the arbitrary number of server */ |
|
|
|
|
size_t addr_idx = 0; |
|
|
|
|
for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { |
|
|
|
|
GPR_ASSERT(addr_idx < num_valid); |
|
|
|
|
const grpc_grpclb_server *server = serverlist->servers[sl_idx]; |
|
|
|
|
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(addr_idx < num_valid); |
|
|
|
|
/* address processing */ |
|
|
|
|
grpc_resolved_address addr; |
|
|
|
|
parse_server(server, &addr); |
|
|
|
|
|
|
|
|
|
/* lb token processing */ |
|
|
|
|
void *user_data; |
|
|
|
|
if (server->has_load_balance_token) { |
|
|
|
@ -596,7 +591,7 @@ static void update_lb_connectivity_status_locked( |
|
|
|
|
grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); |
|
|
|
|
} |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, |
|
|
|
|
GRPC_ERROR_REF(rr_state_error), |
|
|
|
|
rr_state_error, |
|
|
|
|
"update_lb_connectivity_status_locked"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -678,11 +673,12 @@ static bool pick_from_internal_rr_locked( |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
grpc_lb_addresses *addresses = |
|
|
|
|
process_serverlist_locked(exec_ctx, glb_policy->serverlist); |
|
|
|
|
GPR_ASSERT(addresses != NULL); |
|
|
|
|
grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args)); |
|
|
|
|
args->client_channel_factory = glb_policy->cc_factory; |
|
|
|
|
args->combiner = glb_policy->base.combiner; |
|
|
|
|
grpc_lb_addresses *addresses = |
|
|
|
|
process_serverlist_locked(exec_ctx, glb_policy->serverlist); |
|
|
|
|
// Replace the LB addresses in the channel args that we pass down to
|
|
|
|
|
// the subchannel.
|
|
|
|
|
static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; |
|
|
|
@ -727,7 +723,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
/* Connectivity state is a function of the RR policy updated/created */ |
|
|
|
|
update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, |
|
|
|
|
rr_state_error); |
|
|
|
|
|
|
|
|
|
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
|
|
|
|
|
* created RR policy. This will make the RR policy progress upon activity on |
|
|
|
|
* gRPC LB, which in turn is tied to the application's call */ |
|
|
|
@ -761,8 +756,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
pp->wrapped_on_complete_arg.client_stats = |
|
|
|
|
grpc_grpclb_client_stats_ref(glb_policy->client_stats); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", |
|
|
|
|
(intptr_t)glb_policy->rr_policy); |
|
|
|
|
gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p", |
|
|
|
|
(void *)glb_policy->rr_policy); |
|
|
|
|
} |
|
|
|
|
pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, |
|
|
|
|
true /* force_async */, pp->target, |
|
|
|
@ -788,10 +783,9 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
GPR_ASSERT(glb_policy->serverlist != NULL && |
|
|
|
|
glb_policy->serverlist->num_servers > 0); |
|
|
|
|
|
|
|
|
|
if (glb_policy->shutting_down) return; |
|
|
|
|
|
|
|
|
|
grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); |
|
|
|
|
GPR_ASSERT(args != NULL); |
|
|
|
|
if (glb_policy->rr_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", |
|
|
|
@ -826,8 +820,8 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
unref_needed = true; |
|
|
|
|
gpr_free(rr_connectivity); |
|
|
|
|
} else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ |
|
|
|
|
update_lb_connectivity_status_locked(exec_ctx, glb_policy, |
|
|
|
|
rr_connectivity->state, error); |
|
|
|
|
update_lb_connectivity_status_locked( |
|
|
|
|
exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); |
|
|
|
|
/* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ |
|
|
|
|
grpc_lb_policy_notify_on_state_change_locked( |
|
|
|
|
exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, |
|
|
|
@ -1089,6 +1083,16 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Cancel a specific pending pick.
|
|
|
|
|
//
|
|
|
|
|
// A grpclb pick progresses as follows:
|
|
|
|
|
// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
|
|
|
|
|
// handed over to the RR policy (in create_rr_locked()). From that point
|
|
|
|
|
// onwards, it'll be RR's responsibility. For cancellations, that implies the
|
|
|
|
|
// pick needs also be cancelled by the RR instance.
|
|
|
|
|
// - Otherwise, without an RR instance, picks stay pending at this policy's
|
|
|
|
|
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
|
|
|
|
|
// we invoke the completion closure and set *target to NULL right here.
|
|
|
|
|
static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
grpc_connected_subchannel **target, |
|
|
|
|
grpc_error *error) { |
|
|
|
@ -1108,9 +1112,23 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
} |
|
|
|
|
pp = next; |
|
|
|
|
} |
|
|
|
|
if (glb_policy->rr_policy != NULL) { |
|
|
|
|
grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Cancel all pending picks.
|
|
|
|
|
//
|
|
|
|
|
// A grpclb pick progresses as follows:
|
|
|
|
|
// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
|
|
|
|
|
// handed over to the RR policy (in create_rr_locked()). From that point
|
|
|
|
|
// onwards, it'll be RR's responsibility. For cancellations, that implies the
|
|
|
|
|
// pick needs also be cancelled by the RR instance.
|
|
|
|
|
// - Otherwise, without an RR instance, picks stay pending at this policy's
|
|
|
|
|
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
|
|
|
|
|
// we invoke the completion closure and set *target to NULL right here.
|
|
|
|
|
static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_lb_policy *pol, |
|
|
|
|
uint32_t initial_metadata_flags_mask, |
|
|
|
@ -1132,6 +1150,11 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
pp = next; |
|
|
|
|
} |
|
|
|
|
if (glb_policy->rr_policy != NULL) { |
|
|
|
|
grpc_lb_policy_cancel_picks_locked( |
|
|
|
|
exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask, |
|
|
|
|
initial_metadata_flags_eq, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1463,7 +1486,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
op++; |
|
|
|
|
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
|
|
|
|
|
* count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, |
|
|
|
|
"lb_on_sent_initial_request_locked"); |
|
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), |
|
|
|
|
&glb_policy->lb_on_sent_initial_request); |
|
|
|
@ -1480,8 +1504,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
op->reserved = NULL; |
|
|
|
|
op++; |
|
|
|
|
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
|
|
|
|
|
* count goes to zero) to be unref'd in lb_on_server_status_received */ |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); |
|
|
|
|
* count goes to zero) to be unref'd in lb_on_server_status_received_locked */ |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, |
|
|
|
|
"lb_on_server_status_received_locked"); |
|
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), |
|
|
|
|
&glb_policy->lb_on_server_status_received); |
|
|
|
@ -1493,8 +1518,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
op->flags = 0; |
|
|
|
|
op->reserved = NULL; |
|
|
|
|
op++; |
|
|
|
|
/* take another weak ref to be unref'd in lb_on_response_received */ |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); |
|
|
|
|
/* take another weak ref to be unref'd/reused in
|
|
|
|
|
* lb_on_response_received_locked */ |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); |
|
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), |
|
|
|
|
&glb_policy->lb_on_response_received); |
|
|
|
@ -1511,13 +1537,12 @@ static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
do_send_client_load_report_locked(exec_ctx, glb_policy); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, |
|
|
|
|
"lb_on_response_received_locked"); |
|
|
|
|
"lb_on_sent_initial_request_locked"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
glb_lb_policy *glb_policy = arg; |
|
|
|
|
|
|
|
|
|
grpc_op ops[2]; |
|
|
|
|
memset(ops, 0, sizeof(ops)); |
|
|
|
|
grpc_op *op = ops; |
|
|
|
@ -1548,7 +1573,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the
|
|
|
|
|
* strong ref count goes to zero) to be unref'd in |
|
|
|
|
* send_client_load_report() */ |
|
|
|
|
* send_client_load_report_locked() */ |
|
|
|
|
glb_policy->client_load_report_timer_pending = true; |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); |
|
|
|
|
schedule_next_client_load_report(exec_ctx, glb_policy); |
|
|
|
@ -1576,7 +1601,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
gpr_free(ipport); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* update serverlist */ |
|
|
|
|
if (serverlist->num_servers > 0) { |
|
|
|
|
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, |
|
|
|
@ -1611,9 +1635,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_slice_unref_internal(exec_ctx, response_slice); |
|
|
|
|
|
|
|
|
|
if (!glb_policy->shutting_down) { |
|
|
|
|
/* keep listening for serverlist updates */ |
|
|
|
|
op->op = GRPC_OP_RECV_MESSAGE; |
|
|
|
@ -1621,7 +1643,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
op->flags = 0; |
|
|
|
|
op->reserved = NULL; |
|
|
|
|
op++; |
|
|
|
|
/* reuse the "lb_on_response_received" weak ref taken in
|
|
|
|
|
/* reuse the "lb_on_response_received_locked" weak ref taken in
|
|
|
|
|
* query_for_backends_locked() */ |
|
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), |
|
|
|
@ -1629,10 +1651,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
} |
|
|
|
|
} else { /* empty payload: call cancelled. */ |
|
|
|
|
/* dispose of the "lb_on_response_received" weak ref taken in
|
|
|
|
|
/* dispose of the "lb_on_response_received_locked" weak ref taken in
|
|
|
|
|
* query_for_backends_locked() and reused in every reception loop */ |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, |
|
|
|
|
"lb_on_response_received_empty_payload"); |
|
|
|
|
"lb_on_response_received_locked_empty_payload"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1699,7 +1721,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
&glb_policy->lb_on_call_retry, now); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, |
|
|
|
|
"lb_on_server_status_received"); |
|
|
|
|
"lb_on_server_status_received_locked"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, |
|
|
|
|