diff --git a/doc/environment_variables.md b/doc/environment_variables.md index 0a289ac94d4..339b705252b 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -42,6 +42,8 @@ some configuration as environment variables that can be set. - bdp_estimator - traces behavior of bdp estimation logic - call_error - traces the possible errors contributing to final call status - channel - traces operations on the C core channel stack + - client_channel - traces client channel activity, including resolver + and load balancing policy interaction - combiner - traces combiner lock state - compression - traces compression operations - connectivity_state - traces connectivity state changes to channels diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index de516ab4c97..7add4325895 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -52,6 +52,8 @@ /* Client channel implementation */ +grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false); + /************************************************************************* * METHOD-CONFIG TABLE */ @@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_REF(error)); } } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, + grpc_connectivity_state_name(state)); + } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); } @@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, + w->lb_policy, grpc_connectivity_state_name(w->state)); + } if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); @@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); } } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, grpc_combiner_scheduler(chand->combiner)); @@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +static void start_resolving_locked(grpc_exec_ctx *exec_ctx, + channel_data *chand) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); + } + GPR_ASSERT(!chand->started_resolving); + chand->started_resolving = true; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); +} + typedef struct { char *server_name; grpc_server_retry_throttle_data *retry_throttle_data; @@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, + grpc_error_string(error)); + } // Extract the following fields from the resolver result, if non-NULL. char *lb_policy_name = NULL; + bool lb_policy_name_changed = false; grpc_lb_policy *new_lb_policy = NULL; char *service_config_json = NULL; grpc_server_retry_throttle_data *retry_throttle_data = NULL; @@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked // once at any given time. - const bool lb_policy_type_changed = + lb_policy_name_changed = chand->info_lb_policy_name == NULL || strcmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != NULL && !lb_policy_type_changed) { + if (chand->lb_policy != NULL && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); } else { @@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " + "service_config=\"%s\"", + chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "", + service_config_json); + } // Now swap out fields in chand. Note that the new values may still // be NULL if (e.g.) the resolver failed to return results or the // results did not contain the necessary data. @@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || chand->resolver == NULL) { if (chand->lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, + chand->lb_policy); + } grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, chand->interested_parties); @@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // Now that we've swapped out the relevant fields of chand, check for // error or shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); + } if (chand->resolver != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); + } grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; @@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_error *state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (new_lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); + } GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, &state_error); @@ -772,7 +817,9 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call_or_error; gpr_arena *arena; - bool pick_pending; + grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. + grpc_closure lb_pick_closure; + grpc_connected_subchannel *connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; @@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked( } static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, + grpc_call_element *elem, grpc_error *error) { + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + grpc_error_string(error)); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); @@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, } static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, - call_data *calld) { + grpc_call_element *elem) { + call_data *calld = elem->call_data; if (calld->waiting_for_pick_batches_count == 0) return; call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { - waiting_for_pick_batches_fail_locked(exec_ctx, calld, + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(coe.error)); return; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR + " pending batches to subchannel_call=%p", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + coe.subchannel_call); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, calld->waiting_for_pick_batches[i]); @@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + chand, calld); + } if (chand->retry_throttle_data != NULL) { calld->retry_throttle_data = grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); @@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, } static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, grpc_error *error) { + grpc_call_element *elem, + grpc_error *error) { + call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, @@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", + elem->channel_data, calld, subchannel_call, + grpc_error_string(new_error)); + } GPR_ASSERT(set_call_or_error( calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error); } else { - waiting_for_pick_batches_resume_locked(exec_ctx, calld); + waiting_for_pick_batches_resume_locked(exec_ctx, elem); } GRPC_ERROR_UNREF(error); } @@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error *error) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->pick_pending); - calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); call_or_error coe = get_call_or_error(calld); @@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, "Call dropped by load balancing policy") : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failed to create subchannel: error=%s", chand, + calld, grpc_error_string(failure)); + } set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure); } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ grpc_error *child_errors[] = {error, coe.error}; @@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: cancelled before subchannel became ready: %s", + chand, calld, grpc_error_string(cancellation_error)); + } + waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error); } else { /* Create call on subchannel. */ - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_ERROR_UNREF(error); @@ -983,41 +1063,77 @@ typedef struct { grpc_closure closure; } pick_after_resolver_result_args; -static void continue_picking_after_resolver_result_locked( - grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { pick_after_resolver_result_args *args = arg; if (args->cancelled) { /* cancelled, do nothing */ - } else if (error != GRPC_ERROR_NONE) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "call cancelled before resolver result"); + } } else { - if (pick_subchannel_locked(exec_ctx, args->elem)) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + channel_data *chand = args->elem->channel_data; + call_data *calld = args->elem->call_data; + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", + chand, calld); + } + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", + chand, calld); + } + if (pick_subchannel_locked(exec_ctx, args->elem)) { + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + } } } gpr_free(args); } -static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_error *error) { +static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: deferring pick pending resolver result", chand, + calld); + } + pick_after_resolver_result_args *args = + (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + args->elem = elem; + GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, + args, grpc_combiner_scheduler(chand->combiner)); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &args->closure, GRPC_ERROR_NONE); +} + +static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); - } // If we don't yet have a resolver result, then a closure for - // continue_picking_after_resolver_result_locked() will have been added to + // pick_after_resolver_result_done_locked() will have been added to // chand->waiting_for_resolver_result_closures, and it may not be invoked // until after this call has been destroyed. We mark the operation as - // cancelled, so that when continue_picking_after_resolver_result_locked() + // cancelled, so that when pick_after_resolver_result_done_locked() // is called, it will be a no-op. We also immediately invoke // subchannel_ready_locked() to propagate the error back to the caller. for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; closure != NULL; closure = closure->next_data.next) { pick_after_resolver_result_args *args = closure->cb_arg; if (!args->cancelled && args->elem == elem) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: " + "cancelling pick waiting for resolver result", + chand, calld); + } args->cancelled = true; subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_UNREF(error); } -// State for pick callback that holds a reference to the LB policy -// from which the pick was requested. -typedef struct { - grpc_lb_policy *lb_policy; - grpc_call_element *elem; - grpc_closure closure; -} pick_callback_args; - // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy after invoking subchannel_ready_locked(). static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_callback_args *args = arg; - GPR_ASSERT(args != NULL); - GPR_ASSERT(args->lb_policy != NULL); - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); - GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel"); - gpr_free(args); + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + GPR_ASSERT(calld->lb_policy != NULL); + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; + subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). @@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, const grpc_lb_policy_pick_args *inputs) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", + chand, calld, chand->lb_policy); + } + // Keep a ref to the LB policy in calld while the pick is pending. GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - pick_args->lb_policy = chand->lb_policy; - pick_args->elem = elem; - GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args, + calld->lb_policy = chand->lb_policy; + GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); const bool pick_done = grpc_lb_policy_pick_locked( exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, - calld->subchannel_call_context, NULL, &pick_args->closure); + calld->subchannel_call_context, NULL, &calld->lb_pick_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel"); - gpr_free(pick_args); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; } return pick_done; } +static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + GPR_ASSERT(calld->lb_policy != NULL); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + chand, calld, calld->lb_policy); + } + grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, + &calld->connected_subchannel, error); +} + static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GPR_TIMER_BEGIN("pick_subchannel", 0); @@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); } else if (chand->resolver != NULL) { if (!chand->started_resolving) { - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); - args->elem = elem; - GRPC_CLOSURE_INIT(&args->closure, - continue_picking_after_resolver_result_locked, args, - grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &args->closure, GRPC_ERROR_NONE); + pick_after_resolver_result_start_locked(exec_ctx, elem); } else { subchannel_ready_locked( exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); @@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - grpc_transport_stream_op_batch *op = arg; - grpc_call_element *elem = op->handler_private.extra_arg; + grpc_transport_stream_op_batch *batch = arg; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; /* need to recheck that another thread hasn't set the call */ call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); goto done; } // Add to waiting-for-pick list. If we succeed in getting a // subchannel call below, we'll handle this batch (along with any // other waiting batches) in waiting_for_pick_batches_resume_locked(). - waiting_for_pick_batches_add_locked(calld, op); - /* if this is a cancellation, then we can raise our cancelled flag */ - if (op->cancel_stream) { - grpc_error *error = op->payload->cancel_stream.cancel_error; + waiting_for_pick_batches_add_locked(calld, batch); + // If this is a cancellation, cancel the pending pick (if any) and + // fail any pending batches. + if (batch->cancel_stream) { + grpc_error *error = batch->payload->cancel_stream.cancel_error; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, + calld, grpc_error_string(error)); + } /* Stash a copy of cancel_error in our call data, so that we can use it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline + cancelled before any batches are passed down (e.g., if the deadline is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ + error to the caller when the first batch does get passed down. */ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - if (calld->pick_pending) { - cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + if (calld->lb_policy != NULL) { + pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + } else { + pick_after_resolver_result_cancel_locked(exec_ctx, elem, + GRPC_ERROR_REF(error)); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(error)); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); goto done; } /* if we don't have a subchannel, try to get one */ - if (!calld->pick_pending && calld->connected_subchannel == NULL && - op->send_initial_metadata) { - calld->initial_metadata_payload = op->payload; - calld->pick_pending = true; + if (batch->send_initial_metadata) { + GPR_ASSERT(calld->connected_subchannel == NULL); + calld->initial_metadata_payload = batch->payload; GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel_locked(exec_ctx, elem)) { // Pick was returned synchronously. - calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, error); } else { // Create subchannel call. - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, @@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { If it has, we proceed on the fast path. */ static void cc_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace) || + GRPC_TRACER_ON(grpc_trace_channel)) { + grpc_call_log_op(GPR_INFO, elem, batch); + } if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - op); + batch); } // Intercept on_complete for recv_trailing_metadata so that we can // check retry throttle status. - if (op->recv_trailing_metadata) { - GPR_ASSERT(op->on_complete != NULL); - calld->original_on_complete = op->on_complete; + if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->on_complete != NULL); + calld->original_on_complete = batch->on_complete; GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); - op->on_complete = &calld->on_complete; + batch->on_complete = &calld->on_complete; } /* try to (atomically) get the call */ call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); + goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); + goto done; } /* we failed; lock and figure out what to do */ + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); + } GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); - op->handler_private.extra_arg = elem; + batch->handler_private.extra_arg = elem; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, + exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_transport_stream_op_batch_locked, batch, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); +done: GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(!calld->pick_pending); + GPR_ASSERT(calld->lb_policy == NULL); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 63f7c299403..c99f0092e9b 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -23,6 +23,8 @@ #include "src/core/ext/filters/client_channel/resolver.h" #include "src/core/lib/channel/channel_stack.h" +extern grpc_tracer_flag grpc_client_channel_trace; + // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c index 60e77d62683..6f133a648b6 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.c +++ b/src/core/ext/filters/client_channel/client_channel_plugin.c @@ -78,6 +78,7 @@ void grpc_client_channel_init(void) { GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter, (void *)&grpc_client_channel_filter); grpc_http_connect_register_handshaker_factory(); + grpc_register_tracer("client_channel", &grpc_client_channel_trace); #ifndef NDEBUG grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount); #endif