|
|
|
@ -55,8 +55,8 @@ |
|
|
|
|
|
|
|
|
|
/* Client channel implementation */ |
|
|
|
|
|
|
|
|
|
grpc_tracer_flag grpc_client_channel_trace = |
|
|
|
|
GRPC_TRACER_INITIALIZER(false, "client_channel"); |
|
|
|
|
grpc_core::Tracer grpc_client_channel_trace |
|
|
|
|
(false, "client_channel"); |
|
|
|
|
|
|
|
|
|
/*************************************************************************
|
|
|
|
|
* METHOD-CONFIG TABLE |
|
|
|
@ -250,7 +250,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, |
|
|
|
|
grpc_connectivity_state_name(state)); |
|
|
|
|
} |
|
|
|
@ -264,7 +264,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_connectivity_state publish_state = w->state; |
|
|
|
|
/* check if the notification is for the latest policy */ |
|
|
|
|
if (w->lb_policy == w->chand->lb_policy) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, |
|
|
|
|
w->lb_policy, grpc_connectivity_state_name(w->state)); |
|
|
|
|
} |
|
|
|
@ -301,7 +301,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, |
|
|
|
|
|
|
|
|
|
static void start_resolving_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
channel_data *chand) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(!chand->started_resolving); |
|
|
|
@ -374,7 +374,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { |
|
|
|
|
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
channel_data *chand = (channel_data *)arg; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
@ -483,7 +483,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_channel_args_destroy(exec_ctx, chand->resolver_result); |
|
|
|
|
chand->resolver_result = NULL; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p: resolver result: lb_policy_name=\"%s\"%s, " |
|
|
|
|
"service_config=\"%s\"", |
|
|
|
@ -524,7 +524,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || |
|
|
|
|
chand->resolver == NULL) { |
|
|
|
|
if (chand->lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, |
|
|
|
|
chand->lb_policy); |
|
|
|
|
} |
|
|
|
@ -538,11 +538,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
// Now that we've swapped out the relevant fields of chand, check for
|
|
|
|
|
// error or shutdown.
|
|
|
|
|
if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); |
|
|
|
|
} |
|
|
|
|
if (chand->resolver != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); |
|
|
|
|
} |
|
|
|
|
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); |
|
|
|
@ -565,7 +565,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_error *state_error = |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); |
|
|
|
|
if (new_lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(state_error); |
|
|
|
@ -896,7 +896,7 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", |
|
|
|
|
elem->channel_data, calld, calld->waiting_for_pick_batches_count, |
|
|
|
@ -939,7 +939,7 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR |
|
|
|
|
" pending batches to subchannel_call=%p", |
|
|
|
|
chand, calld, calld->waiting_for_pick_batches_count, |
|
|
|
@ -965,7 +965,7 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
@ -1011,7 +1011,7 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_error *new_error = grpc_connected_subchannel_create_call( |
|
|
|
|
exec_ctx, calld->connected_subchannel, &call_args, |
|
|
|
|
&calld->subchannel_call); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", |
|
|
|
|
chand, calld, calld->subchannel_call, grpc_error_string(new_error)); |
|
|
|
|
} |
|
|
|
@ -1037,7 +1037,7 @@ static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
"Call dropped by load balancing policy") |
|
|
|
|
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
"Failed to create subchannel", &error, 1); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: failed to create subchannel: error=%s", chand, |
|
|
|
|
calld, grpc_error_string(calld->error)); |
|
|
|
@ -1071,7 +1071,7 @@ static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (calld->lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", |
|
|
|
|
chand, calld, calld->lb_policy); |
|
|
|
|
} |
|
|
|
@ -1089,7 +1089,7 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_call_element *elem = (grpc_call_element *)arg; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
@ -1106,7 +1106,7 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", |
|
|
|
|
chand, calld, chand->lb_policy); |
|
|
|
|
} |
|
|
|
@ -1144,7 +1144,7 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
calld->subchannel_call_context, NULL, &calld->lb_pick_closure); |
|
|
|
|
if (pick_done) { |
|
|
|
|
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
@ -1190,7 +1190,7 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem = args->elem; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: cancelling pick waiting for resolver result", |
|
|
|
|
chand, calld); |
|
|
|
@ -1212,7 +1212,7 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
(pick_after_resolver_result_args *)arg; |
|
|
|
|
if (args->finished) { |
|
|
|
|
/* cancelled, do nothing */ |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "call cancelled before resolver result"); |
|
|
|
|
} |
|
|
|
|
gpr_free(args); |
|
|
|
@ -1223,13 +1223,13 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); |
|
|
|
|
} else { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
@ -1248,7 +1248,7 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: deferring pick pending resolver result", chand, |
|
|
|
|
calld); |
|
|
|
@ -1333,7 +1333,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); |
|
|
|
|
// If we've previously been cancelled, immediately fail any new batches.
|
|
|
|
|
if (calld->error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", |
|
|
|
|
chand, calld, grpc_error_string(calld->error)); |
|
|
|
|
} |
|
|
|
@ -1349,7 +1349,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
// error to the caller when the first batch does get passed down.
|
|
|
|
|
GRPC_ERROR_UNREF(calld->error); |
|
|
|
|
calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, |
|
|
|
|
calld, grpc_error_string(calld->error)); |
|
|
|
|
} |
|
|
|
@ -1378,7 +1378,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
// the channel combiner, which is more efficient (especially for
|
|
|
|
|
// streaming calls).
|
|
|
|
|
if (calld->subchannel_call != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: sending batch to subchannel_call=%p", chand, |
|
|
|
|
calld, calld->subchannel_call); |
|
|
|
@ -1392,7 +1392,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
// For batches containing a send_initial_metadata op, enter the channel
|
|
|
|
|
// combiner to start a pick.
|
|
|
|
|
if (batch->send_initial_metadata) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
@ -1403,7 +1403,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
// For all other batches, release the call combiner.
|
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: saved batch, yeilding call combiner", chand, |
|
|
|
|
calld); |
|
|
|
|