|
|
|
@ -52,6 +52,8 @@ |
|
|
|
|
|
|
|
|
|
/* Client channel implementation */ |
|
|
|
|
|
|
|
|
|
grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false); |
|
|
|
|
|
|
|
|
|
/*************************************************************************
|
|
|
|
|
* METHOD-CONFIG TABLE |
|
|
|
|
*/ |
|
|
|
@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, |
|
|
|
|
grpc_connectivity_state_name(state)); |
|
|
|
|
} |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, |
|
|
|
|
reason); |
|
|
|
|
} |
|
|
|
@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_connectivity_state publish_state = w->state; |
|
|
|
|
/* check if the notification is for the latest policy */ |
|
|
|
|
if (w->lb_policy == w->chand->lb_policy) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, |
|
|
|
|
w->lb_policy, grpc_connectivity_state_name(w->state)); |
|
|
|
|
} |
|
|
|
|
if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { |
|
|
|
|
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
|
|
|
|
grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); |
|
|
|
@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); |
|
|
|
|
gpr_free(w); |
|
|
|
|
} |
|
|
|
@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, |
|
|
|
|
grpc_connectivity_state current_state) { |
|
|
|
|
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); |
|
|
|
|
|
|
|
|
|
w->chand = chand; |
|
|
|
|
GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)); |
|
|
|
@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, |
|
|
|
|
&w->on_changed); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void start_resolving_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
channel_data *chand) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(!chand->started_resolving); |
|
|
|
|
chand->started_resolving = true; |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); |
|
|
|
|
grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, |
|
|
|
|
&chand->on_resolver_result_changed); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
char *server_name; |
|
|
|
|
grpc_server_retry_throttle_data *retry_throttle_data; |
|
|
|
@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { |
|
|
|
|
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
channel_data *chand = arg; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
|
// Extract the following fields from the resolver result, if non-NULL.
|
|
|
|
|
char *lb_policy_name = NULL; |
|
|
|
|
bool lb_policy_name_changed = false; |
|
|
|
|
grpc_lb_policy *new_lb_policy = NULL; |
|
|
|
|
char *service_config_json = NULL; |
|
|
|
|
grpc_server_retry_throttle_data *retry_throttle_data = NULL; |
|
|
|
@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
// taking a lock on chand->info_mu, because this function is the
|
|
|
|
|
// only thing that modifies its value, and it can only be invoked
|
|
|
|
|
// once at any given time.
|
|
|
|
|
const bool lb_policy_type_changed = |
|
|
|
|
lb_policy_name_changed = |
|
|
|
|
chand->info_lb_policy_name == NULL || |
|
|
|
|
strcmp(chand->info_lb_policy_name, lb_policy_name) != 0; |
|
|
|
|
if (chand->lb_policy != NULL && !lb_policy_type_changed) { |
|
|
|
|
if (chand->lb_policy != NULL && !lb_policy_name_changed) { |
|
|
|
|
// Continue using the same LB policy. Update with new addresses.
|
|
|
|
|
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); |
|
|
|
|
} else { |
|
|
|
@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_channel_args_destroy(exec_ctx, chand->resolver_result); |
|
|
|
|
chand->resolver_result = NULL; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p: resolver result: lb_policy_name=\"%s\"%s, " |
|
|
|
|
"service_config=\"%s\"", |
|
|
|
|
chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "", |
|
|
|
|
service_config_json); |
|
|
|
|
} |
|
|
|
|
// Now swap out fields in chand. Note that the new values may still
|
|
|
|
|
// be NULL if (e.g.) the resolver failed to return results or the
|
|
|
|
|
// results did not contain the necessary data.
|
|
|
|
@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || |
|
|
|
|
chand->resolver == NULL) { |
|
|
|
|
if (chand->lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, |
|
|
|
|
chand->lb_policy); |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_del_pollset_set(exec_ctx, |
|
|
|
|
chand->lb_policy->interested_parties, |
|
|
|
|
chand->interested_parties); |
|
|
|
@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
// Now that we've swapped out the relevant fields of chand, check for
|
|
|
|
|
// error or shutdown.
|
|
|
|
|
if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); |
|
|
|
|
} |
|
|
|
|
if (chand->resolver != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); |
|
|
|
|
} |
|
|
|
|
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); |
|
|
|
|
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); |
|
|
|
|
chand->resolver = NULL; |
|
|
|
@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_error *state_error = |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); |
|
|
|
|
if (new_lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
|
state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, |
|
|
|
|
&state_error); |
|
|
|
@ -772,7 +817,9 @@ typedef struct client_channel_call_data { |
|
|
|
|
gpr_atm subchannel_call_or_error; |
|
|
|
|
gpr_arena *arena; |
|
|
|
|
|
|
|
|
|
bool pick_pending; |
|
|
|
|
grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
|
|
|
|
|
grpc_closure lb_pick_closure; |
|
|
|
|
|
|
|
|
|
grpc_connected_subchannel *connected_subchannel; |
|
|
|
|
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; |
|
|
|
|
grpc_polling_entity *pollent; |
|
|
|
@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
call_data *calld, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", |
|
|
|
|
elem->channel_data, calld, calld->waiting_for_pick_batches_count, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { |
|
|
|
|
grpc_transport_stream_op_batch_finish_with_failure( |
|
|
|
|
exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); |
|
|
|
@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
call_data *calld) { |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (calld->waiting_for_pick_batches_count == 0) return; |
|
|
|
|
call_or_error coe = get_call_or_error(calld); |
|
|
|
|
if (coe.error != GRPC_ERROR_NONE) { |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, calld, |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, elem, |
|
|
|
|
GRPC_ERROR_REF(coe.error)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR |
|
|
|
|
" pending batches to subchannel_call=%p", |
|
|
|
|
elem->channel_data, calld, calld->waiting_for_pick_batches_count, |
|
|
|
|
coe.subchannel_call); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, |
|
|
|
|
calld->waiting_for_pick_batches[i]); |
|
|
|
@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
if (chand->retry_throttle_data != NULL) { |
|
|
|
|
calld->retry_throttle_data = |
|
|
|
|
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); |
|
|
|
@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
call_data *calld, grpc_error *error) { |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
grpc_subchannel_call *subchannel_call = NULL; |
|
|
|
|
const grpc_connected_subchannel_call_args call_args = { |
|
|
|
|
.pollent = calld->pollent, |
|
|
|
@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
.context = calld->subchannel_call_context}; |
|
|
|
|
grpc_error *new_error = grpc_connected_subchannel_create_call( |
|
|
|
|
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", |
|
|
|
|
elem->channel_data, calld, subchannel_call, |
|
|
|
|
grpc_error_string(new_error)); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(set_call_or_error( |
|
|
|
|
calld, (call_or_error){.subchannel_call = subchannel_call})); |
|
|
|
|
if (new_error != GRPC_ERROR_NONE) { |
|
|
|
|
new_error = grpc_error_add_child(new_error, error); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error); |
|
|
|
|
} else { |
|
|
|
|
waiting_for_pick_batches_resume_locked(exec_ctx, calld); |
|
|
|
|
waiting_for_pick_batches_resume_locked(exec_ctx, elem); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
GPR_ASSERT(calld->pick_pending); |
|
|
|
|
calld->pick_pending = false; |
|
|
|
|
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, |
|
|
|
|
chand->interested_parties); |
|
|
|
|
call_or_error coe = get_call_or_error(calld); |
|
|
|
@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
"Call dropped by load balancing policy") |
|
|
|
|
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
"Failed to create subchannel", &error, 1); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: failed to create subchannel: error=%s", chand, |
|
|
|
|
calld, grpc_error_string(failure)); |
|
|
|
|
} |
|
|
|
|
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure); |
|
|
|
|
} else if (coe.error != GRPC_ERROR_NONE) { |
|
|
|
|
/* already cancelled before subchannel became ready */ |
|
|
|
|
grpc_error *child_errors[] = {error, coe.error}; |
|
|
|
@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, |
|
|
|
|
GRPC_STATUS_DEADLINE_EXCEEDED); |
|
|
|
|
} |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: cancelled before subchannel became ready: %s", |
|
|
|
|
chand, calld, grpc_error_string(cancellation_error)); |
|
|
|
|
} |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error); |
|
|
|
|
} else { |
|
|
|
|
/* Create call on subchannel. */ |
|
|
|
|
create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); |
|
|
|
|
create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
@ -983,41 +1063,77 @@ typedef struct { |
|
|
|
|
grpc_closure closure; |
|
|
|
|
} pick_after_resolver_result_args; |
|
|
|
|
|
|
|
|
|
static void continue_picking_after_resolver_result_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
pick_after_resolver_result_args *args = arg; |
|
|
|
|
if (args->cancelled) { |
|
|
|
|
/* cancelled, do nothing */ |
|
|
|
|
} else if (error != GRPC_ERROR_NONE) { |
|
|
|
|
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "call cancelled before resolver result"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (pick_subchannel_locked(exec_ctx, args->elem)) { |
|
|
|
|
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); |
|
|
|
|
channel_data *chand = args->elem->channel_data; |
|
|
|
|
call_data *calld = args->elem->call_data; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); |
|
|
|
|
} else { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
if (pick_subchannel_locked(exec_ctx, args->elem)) { |
|
|
|
|
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_free(args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: deferring pick pending resolver result", chand, |
|
|
|
|
calld); |
|
|
|
|
} |
|
|
|
|
pick_after_resolver_result_args *args = |
|
|
|
|
(pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); |
|
|
|
|
args->elem = elem; |
|
|
|
|
GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, |
|
|
|
|
args, grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, |
|
|
|
|
&args->closure, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (chand->lb_policy != NULL) { |
|
|
|
|
grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, |
|
|
|
|
&calld->connected_subchannel, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
// If we don't yet have a resolver result, then a closure for
|
|
|
|
|
// continue_picking_after_resolver_result_locked() will have been added to
|
|
|
|
|
// pick_after_resolver_result_done_locked() will have been added to
|
|
|
|
|
// chand->waiting_for_resolver_result_closures, and it may not be invoked
|
|
|
|
|
// until after this call has been destroyed. We mark the operation as
|
|
|
|
|
// cancelled, so that when continue_picking_after_resolver_result_locked()
|
|
|
|
|
// cancelled, so that when pick_after_resolver_result_done_locked()
|
|
|
|
|
// is called, it will be a no-op. We also immediately invoke
|
|
|
|
|
// subchannel_ready_locked() to propagate the error back to the caller.
|
|
|
|
|
for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; |
|
|
|
|
closure != NULL; closure = closure->next_data.next) { |
|
|
|
|
pick_after_resolver_result_args *args = closure->cb_arg; |
|
|
|
|
if (!args->cancelled && args->elem == elem) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: " |
|
|
|
|
"cancelling pick waiting for resolver result", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
args->cancelled = true; |
|
|
|
|
subchannel_ready_locked(exec_ctx, elem, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// State for pick callback that holds a reference to the LB policy
|
|
|
|
|
// from which the pick was requested.
|
|
|
|
|
typedef struct { |
|
|
|
|
grpc_lb_policy *lb_policy; |
|
|
|
|
grpc_call_element *elem; |
|
|
|
|
grpc_closure closure; |
|
|
|
|
} pick_callback_args; |
|
|
|
|
|
|
|
|
|
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
|
|
|
|
|
// Unrefs the LB policy after invoking subchannel_ready_locked().
|
|
|
|
|
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
pick_callback_args *args = arg; |
|
|
|
|
GPR_ASSERT(args != NULL); |
|
|
|
|
GPR_ASSERT(args->lb_policy != NULL); |
|
|
|
|
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel"); |
|
|
|
|
gpr_free(args); |
|
|
|
|
grpc_call_element *elem = arg; |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(calld->lb_policy != NULL); |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); |
|
|
|
|
calld->lb_policy = NULL; |
|
|
|
|
subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
|
|
|
|
@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
const grpc_lb_policy_pick_args *inputs) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args)); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", |
|
|
|
|
chand, calld, chand->lb_policy); |
|
|
|
|
} |
|
|
|
|
// Keep a ref to the LB policy in calld while the pick is pending.
|
|
|
|
|
GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); |
|
|
|
|
pick_args->lb_policy = chand->lb_policy; |
|
|
|
|
pick_args->elem = elem; |
|
|
|
|
GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args, |
|
|
|
|
calld->lb_policy = chand->lb_policy; |
|
|
|
|
GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
const bool pick_done = grpc_lb_policy_pick_locked( |
|
|
|
|
exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, |
|
|
|
|
calld->subchannel_call_context, NULL, &pick_args->closure); |
|
|
|
|
calld->subchannel_call_context, NULL, &calld->lb_pick_closure); |
|
|
|
|
if (pick_done) { |
|
|
|
|
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel"); |
|
|
|
|
gpr_free(pick_args); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); |
|
|
|
|
calld->lb_policy = NULL; |
|
|
|
|
} |
|
|
|
|
return pick_done; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
GPR_ASSERT(calld->lb_policy != NULL); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", |
|
|
|
|
chand, calld, calld->lb_policy); |
|
|
|
|
} |
|
|
|
|
grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, |
|
|
|
|
&calld->connected_subchannel, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
GPR_TIMER_BEGIN("pick_subchannel", 0); |
|
|
|
@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); |
|
|
|
|
} else if (chand->resolver != NULL) { |
|
|
|
|
if (!chand->started_resolving) { |
|
|
|
|
chand->started_resolving = true; |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); |
|
|
|
|
grpc_resolver_next_locked(exec_ctx, chand->resolver, |
|
|
|
|
&chand->resolver_result, |
|
|
|
|
&chand->on_resolver_result_changed); |
|
|
|
|
start_resolving_locked(exec_ctx, chand); |
|
|
|
|
} |
|
|
|
|
pick_after_resolver_result_args *args = |
|
|
|
|
(pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); |
|
|
|
|
args->elem = elem; |
|
|
|
|
GRPC_CLOSURE_INIT(&args->closure, |
|
|
|
|
continue_picking_after_resolver_result_locked, args, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, |
|
|
|
|
&args->closure, GRPC_ERROR_NONE); |
|
|
|
|
pick_after_resolver_result_start_locked(exec_ctx, elem); |
|
|
|
|
} else { |
|
|
|
|
subchannel_ready_locked( |
|
|
|
|
exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); |
|
|
|
|
grpc_transport_stream_op_batch *op = arg; |
|
|
|
|
grpc_call_element *elem = op->handler_private.extra_arg; |
|
|
|
|
grpc_transport_stream_op_batch *batch = arg; |
|
|
|
|
grpc_call_element *elem = batch->handler_private.extra_arg; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
/* need to recheck that another thread hasn't set the call */ |
|
|
|
|
call_or_error coe = get_call_or_error(calld); |
|
|
|
|
if (coe.error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", |
|
|
|
|
chand, calld, grpc_error_string(coe.error)); |
|
|
|
|
} |
|
|
|
|
grpc_transport_stream_op_batch_finish_with_failure( |
|
|
|
|
exec_ctx, op, GRPC_ERROR_REF(coe.error)); |
|
|
|
|
exec_ctx, batch, GRPC_ERROR_REF(coe.error)); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
if (coe.subchannel_call != NULL) { |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: sending batch to subchannel_call=%p", chand, |
|
|
|
|
calld, coe.subchannel_call); |
|
|
|
|
} |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
// Add to waiting-for-pick list. If we succeed in getting a
|
|
|
|
|
// subchannel call below, we'll handle this batch (along with any
|
|
|
|
|
// other waiting batches) in waiting_for_pick_batches_resume_locked().
|
|
|
|
|
waiting_for_pick_batches_add_locked(calld, op); |
|
|
|
|
/* if this is a cancellation, then we can raise our cancelled flag */ |
|
|
|
|
if (op->cancel_stream) { |
|
|
|
|
grpc_error *error = op->payload->cancel_stream.cancel_error; |
|
|
|
|
waiting_for_pick_batches_add_locked(calld, batch); |
|
|
|
|
// If this is a cancellation, cancel the pending pick (if any) and
|
|
|
|
|
// fail any pending batches.
|
|
|
|
|
if (batch->cancel_stream) { |
|
|
|
|
grpc_error *error = batch->payload->cancel_stream.cancel_error; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, |
|
|
|
|
calld, grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
|
/* Stash a copy of cancel_error in our call data, so that we can use
|
|
|
|
|
it for subsequent operations. This ensures that if the call is |
|
|
|
|
cancelled before any ops are passed down (e.g., if the deadline |
|
|
|
|
cancelled before any batches are passed down (e.g., if the deadline |
|
|
|
|
is in the past when the call starts), we can return the right |
|
|
|
|
error to the caller when the first op does get passed down. */ |
|
|
|
|
error to the caller when the first batch does get passed down. */ |
|
|
|
|
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); |
|
|
|
|
if (calld->pick_pending) { |
|
|
|
|
cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); |
|
|
|
|
if (calld->lb_policy != NULL) { |
|
|
|
|
pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); |
|
|
|
|
} else { |
|
|
|
|
pick_after_resolver_result_cancel_locked(exec_ctx, elem, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, calld, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
/* if we don't have a subchannel, try to get one */ |
|
|
|
|
if (!calld->pick_pending && calld->connected_subchannel == NULL && |
|
|
|
|
op->send_initial_metadata) { |
|
|
|
|
calld->initial_metadata_payload = op->payload; |
|
|
|
|
calld->pick_pending = true; |
|
|
|
|
if (batch->send_initial_metadata) { |
|
|
|
|
GPR_ASSERT(calld->connected_subchannel == NULL); |
|
|
|
|
calld->initial_metadata_payload = batch->payload; |
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); |
|
|
|
|
/* If a subchannel is not available immediately, the polling entity from
|
|
|
|
|
call_data should be provided to channel_data's interested_parties, so |
|
|
|
|
that IO of the lb_policy and resolver could be done under it. */ |
|
|
|
|
if (pick_subchannel_locked(exec_ctx, elem)) { |
|
|
|
|
// Pick was returned synchronously.
|
|
|
|
|
calld->pick_pending = false; |
|
|
|
|
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); |
|
|
|
|
if (calld->connected_subchannel == NULL) { |
|
|
|
|
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"Call dropped by load balancing policy"); |
|
|
|
|
set_call_or_error(calld, |
|
|
|
|
(call_or_error){.error = GRPC_ERROR_REF(error)}); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, calld, error); |
|
|
|
|
waiting_for_pick_batches_fail_locked(exec_ctx, elem, error); |
|
|
|
|
} else { |
|
|
|
|
// Create subchannel call.
|
|
|
|
|
create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE); |
|
|
|
|
create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, |
|
|
|
@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
If it has, we proceed on the fast path. */ |
|
|
|
|
static void cc_start_transport_stream_op_batch( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
grpc_transport_stream_op_batch *op) { |
|
|
|
|
grpc_transport_stream_op_batch *batch) { |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
GRPC_CALL_LOG_OP(GPR_INFO, elem, op); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace) || |
|
|
|
|
GRPC_TRACER_ON(grpc_trace_channel)) { |
|
|
|
|
grpc_call_log_op(GPR_INFO, elem, batch); |
|
|
|
|
} |
|
|
|
|
if (chand->deadline_checking_enabled) { |
|
|
|
|
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, |
|
|
|
|
op); |
|
|
|
|
batch); |
|
|
|
|
} |
|
|
|
|
// Intercept on_complete for recv_trailing_metadata so that we can
|
|
|
|
|
// check retry throttle status.
|
|
|
|
|
if (op->recv_trailing_metadata) { |
|
|
|
|
GPR_ASSERT(op->on_complete != NULL); |
|
|
|
|
calld->original_on_complete = op->on_complete; |
|
|
|
|
if (batch->recv_trailing_metadata) { |
|
|
|
|
GPR_ASSERT(batch->on_complete != NULL); |
|
|
|
|
calld->original_on_complete = batch->on_complete; |
|
|
|
|
GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
op->on_complete = &calld->on_complete; |
|
|
|
|
batch->on_complete = &calld->on_complete; |
|
|
|
|
} |
|
|
|
|
/* try to (atomically) get the call */ |
|
|
|
|
call_or_error coe = get_call_or_error(calld); |
|
|
|
|
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); |
|
|
|
|
if (coe.error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", |
|
|
|
|
chand, calld, grpc_error_string(coe.error)); |
|
|
|
|
} |
|
|
|
|
grpc_transport_stream_op_batch_finish_with_failure( |
|
|
|
|
exec_ctx, op, GRPC_ERROR_REF(coe.error)); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
exec_ctx, batch, GRPC_ERROR_REF(coe.error)); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
if (coe.subchannel_call != NULL) { |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: sending batch to subchannel_call=%p", chand, |
|
|
|
|
calld, coe.subchannel_call); |
|
|
|
|
} |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
/* we failed; lock and figure out what to do */ |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); |
|
|
|
|
} |
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); |
|
|
|
|
op->handler_private.extra_arg = elem; |
|
|
|
|
batch->handler_private.extra_arg = elem; |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, |
|
|
|
|
start_transport_stream_op_batch_locked, op, |
|
|
|
|
exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
|
|
|
|
start_transport_stream_op_batch_locked, batch, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
done: |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, |
|
|
|
|
"client_channel_destroy_call"); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(!calld->pick_pending); |
|
|
|
|
GPR_ASSERT(calld->lb_policy == NULL); |
|
|
|
|
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); |
|
|
|
|
if (calld->connected_subchannel != NULL) { |
|
|
|
|
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, |
|
|
|
@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} else { |
|
|
|
|
chand->exit_idle_when_lb_policy_arrives = true; |
|
|
|
|
if (!chand->started_resolving && chand->resolver != NULL) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); |
|
|
|
|
chand->started_resolving = true; |
|
|
|
|
grpc_resolver_next_locked(exec_ctx, chand->resolver, |
|
|
|
|
&chand->resolver_result, |
|
|
|
|
&chand->on_resolver_result_changed); |
|
|
|
|
start_resolving_locked(exec_ctx, chand); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); |
|
|
|
|