Clean up client_channel code and eliminate unnecessary allocations.

pull/11352/head
Mark D. Roth 8 years ago
parent b82bfa4e95
commit 0ca0be8b56
  1. 643
      src/core/ext/filters/client_channel/client_channel.c
  2. 4
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
  3. 4
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c

@ -178,8 +178,8 @@ typedef struct client_channel_channel_data {
grpc_slice_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
grpc_channel_args *resolver_result;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures;
/** a list of closures that are all waiting for resolver result to come in */
grpc_closure_list waiting_for_resolver_result_closures;
/** resolver callback */
grpc_closure on_resolver_result_changed;
/** connectivity state being tracked */
@ -342,49 +342,15 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
// Wrap a closure associated with \a lb_policy. The associated callback (\a
// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
// scheduling \a wrapped_closure.
typedef struct wrapped_on_pick_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure;
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
/* The policy instance related to the closure */
grpc_lb_policy *lb_policy;
} wrapped_on_pick_closure_arg;
// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
wrapped_on_pick_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg != NULL);
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GPR_ASSERT(wc_arg->lb_policy != NULL);
GRPC_CLOSURE_RUN(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
gpr_free(wc_arg);
}
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
// Extract the following fields from the resolver result, if non-NULL.
char *lb_policy_name = NULL;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy = NULL;
grpc_slice_hash_table *method_params_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
grpc_lb_policy *new_lb_policy = NULL;
char *service_config_json = NULL;
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
bool lb_policy_updated = false;
grpc_server_retry_throttle_data *retry_throttle_data = NULL;
grpc_slice_hash_table *method_params_table = NULL;
if (chand->resolver_result != NULL) {
// Find LB policy name.
const grpc_arg *channel_arg =
@ -419,32 +385,29 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == NULL) lb_policy_name = "pick_first";
// Instantiate LB policy.
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
const bool lb_policy_type_changed =
(chand->info_lb_policy_name == NULL) ||
(strcmp(chand->info_lb_policy_name, lb_policy_name) != 0);
chand->info_lb_policy_name == NULL ||
strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
if (chand->lb_policy != NULL && !lb_policy_type_changed) {
// update
lb_policy_updated = true;
// Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
} else {
lb_policy =
// Instantiate new LB policy.
new_lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
&state_error);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (new_lb_policy == NULL) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
}
}
// Find service config.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
@ -461,12 +424,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_uri *uri =
grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
parsing_state.server_name =
uri->path[0] == '/' ? uri->path + 1 : uri->path;
grpc_service_config_parse_global_params(
service_config, parse_retry_throttle_params, &parsing_state);
parsing_state.server_name = NULL;
grpc_uri_destroy(uri);
retry_throttle_data = parsing_state.retry_throttle_data;
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
method_parameters_free);
@ -480,12 +445,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
chand->interested_parties);
}
// Now swap out fields in chand. Note that the new values may still
// be NULL if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
//
// First, swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
if (lb_policy_name != NULL) {
gpr_free(chand->info_lb_policy_name);
@ -496,75 +460,77 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
chand->info_service_config_json = service_config_json;
}
gpr_mu_unlock(&chand->info_mu);
// Swap out the retry throttle data.
if (chand->retry_throttle_data != NULL) {
grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
}
chand->retry_throttle_data = parsing_state.retry_throttle_data;
chand->retry_throttle_data = retry_throttle_data;
// Swap out the method params table.
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
chand->method_params_table = method_params_table;
if (lb_policy != NULL) {
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
} else if (chand->resolver == NULL /* disconnected */) {
grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Channel disconnected", &error, 1));
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
}
if (!lb_policy_updated && lb_policy != NULL &&
chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
exit_idle = true;
chand->exit_idle_when_lb_policy_arrives = false;
}
if (error == GRPC_ERROR_NONE && chand->resolver) {
if (!lb_policy_updated) {
set_channel_connectivity_state_locked(exec_ctx, chand, state,
GRPC_ERROR_REF(state_error),
"new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
// If we have a new LB policy or are shutting down (in which case
// new_lb_policy will be NULL), swap out the LB policy, unreffing the
// old one and removing its fds from chand->interested_parties.
// Note that we do NOT do this if either (a) we updated the existing
// LB policy above or (b) we failed to create the new LB policy (in
// which case we want to continue using the most recent one we had).
if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
chand->resolver == NULL) {
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
chand->lb_policy = new_lb_policy;
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver,
&chand->resolver_result,
&chand->on_resolver_result_changed);
} else {
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
if (chand->resolver != NULL) {
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
grpc_error *refs[] = {error, state_error};
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)),
"Got resolver result after disconnection", &error, 1),
"resolver_gone");
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Channel disconnected", &error, 1));
GRPC_CLOSURE_LIST_SCHED(exec_ctx,
&chand->waiting_for_resolver_result_closures);
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error *state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (new_lb_policy != NULL) {
GRPC_ERROR_UNREF(state_error);
state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
&state_error);
grpc_pollset_set_add_pollset_set(exec_ctx,
new_lb_policy->interested_parties,
chand->interested_parties);
GRPC_CLOSURE_LIST_SCHED(exec_ctx,
&chand->waiting_for_resolver_result_closures);
if (chand->exit_idle_when_lb_policy_arrives) {
grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
chand->exit_idle_when_lb_policy_arrives = false;
}
if (!lb_policy_updated && lb_policy != NULL && exit_idle) {
grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
}
if (old_lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(
exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
old_lb_policy = NULL;
}
if (!lb_policy_updated && lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
set_channel_connectivity_state_locked(
exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver,
&chand->resolver_result,
&chand->on_resolver_result_changed);
GRPC_ERROR_UNREF(state_error);
}
}
static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -602,9 +568,10 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
if (!chand->started_resolving) {
grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
GRPC_ERROR_REF(op->disconnect_with_error));
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
GRPC_CLOSURE_LIST_SCHED(exec_ctx,
&chand->waiting_for_resolver_result_closures);
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
@ -770,6 +737,16 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
* PER-CALL FUNCTIONS
*/
// Max number of batches that can be pending on a call at any given
// time. This includes:
// recv_initial_metadata
// send_initial_metadata
// recv_message
// send_message
// recv_trailing_metadata
// send_trailing_metadata
#define MAX_WAITING_BATCHES 6
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
@ -800,11 +777,10 @@ typedef struct client_channel_call_data {
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
grpc_transport_stream_op_batch **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
size_t waiting_for_pick_batches_count;
grpc_closure next_step;
grpc_transport_stream_op_batch_payload *initial_metadata_payload;
grpc_call_stack *owning_call;
@ -853,57 +829,44 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
return get_call_or_error(call_elem->call_data).subchannel_call;
}
static void add_waiting_locked(call_data *calld,
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
calld->waiting_ops =
gpr_realloc(calld->waiting_ops,
calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
}
calld->waiting_ops[calld->waiting_ops_count++] = op;
GPR_TIMER_END("add_waiting_locked", 0);
static void waiting_for_pick_batches_add_locked(
call_data *calld, grpc_transport_stream_op_batch *batch) {
GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
batch;
}
static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
call_data *calld,
grpc_error *error) {
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
calld->waiting_for_pick_batches_count = 0;
GRPC_ERROR_UNREF(error);
}
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
if (calld->waiting_ops_count == 0) {
return;
}
call_or_error call = get_call_or_error(calld);
grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
if (call.error != GRPC_ERROR_NONE) {
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error));
static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
call_data *calld) {
if (calld->waiting_for_pick_batches_count == 0) return;
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
waiting_for_pick_batches_fail_locked(exec_ctx, calld,
GRPC_ERROR_REF(coe.error));
return;
}
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
for (size_t i = 0; i < nops; i++) {
grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]);
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
calld->waiting_for_pick_batches[i]);
}
gpr_free(ops);
calld->waiting_for_pick_batches_count = 0;
}
// Sets calld->method_params and calld->retry_throttle_data.
// If the method params specify a timeout, populates
// *per_method_deadline and returns true.
static bool set_call_method_params_from_service_config_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_timespec *per_method_deadline) {
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (chand->retry_throttle_data != NULL) {
@ -915,39 +878,48 @@ static bool set_call_method_params_from_service_config_locked(
exec_ctx, chand->method_params_table, calld->path);
if (calld->method_params != NULL) {
method_parameters_ref(calld->method_params);
if (gpr_time_cmp(calld->method_params->timeout,
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
if (chand->deadline_checking_enabled &&
gpr_time_cmp(calld->method_params->timeout,
gpr_time_0(GPR_TIMESPAN)) != 0) {
*per_method_deadline =
const gpr_timespec per_method_deadline =
gpr_time_add(calld->call_start_time, calld->method_params->timeout);
return true;
if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
calld->deadline = per_method_deadline;
grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
}
}
}
return false;
}
static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
/* apply service-config level configuration to the call (now that we're
* certain it exists) */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_timespec per_method_deadline;
if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
&per_method_deadline)) {
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
if (chand->deadline_checking_enabled &&
gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
calld->deadline = per_method_deadline;
grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
call_data *calld, grpc_error *error) {
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
GPR_ASSERT(set_call_or_error(
calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error);
} else {
waiting_for_pick_batches_resume_locked(exec_ctx, calld);
}
GRPC_ERROR_UNREF(error);
}
static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_error *error) {
grpc_call_element *elem = arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(calld->pick_pending);
@ -956,6 +928,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
chand->interested_parties);
call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
// Failed to create subchannel.
grpc_error *failure =
error == GRPC_ERROR_NONE
? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -963,7 +936,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to create subchannel", &error, 1);
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
fail_locked(exec_ctx, calld, failure);
waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure);
} else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
grpc_error *child_errors[] = {error, coe.error};
@ -977,29 +950,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_DEADLINE_EXCEEDED);
}
fail_locked(exec_ctx, calld, cancellation_error);
waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error);
} else {
/* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
GPR_ASSERT(set_call_or_error(
calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
fail_locked(exec_ctx, calld, new_error);
} else {
retry_waiting_locked(exec_ctx, calld);
}
create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
GRPC_ERROR_UNREF(error);
}
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
@ -1013,41 +970,32 @@ static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
}
}
/** Return true if subchannel is available immediately (in which case
subchannel_ready_locked() should not be called), or false otherwise (in
which case subchannel_ready_locked() should be called when the subchannel
is available). */
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem);
typedef struct {
grpc_metadata_batch *initial_metadata;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_call_context_element *subchannel_call_context;
grpc_closure *on_ready;
grpc_call_element *elem;
bool cancelled;
grpc_closure closure;
} continue_picking_args;
/** Return true if subchannel is available immediately (in which case on_ready
should not be called), or false otherwise (in which case on_ready should be
called when the subchannel is available). */
static bool pick_subchannel_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready);
} pick_after_resolver_result_args;
static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
continue_picking_args *cpa = arg;
if (cpa->connected_subchannel == NULL) {
static void continue_picking_after_resolver_result_locked(
grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
pick_after_resolver_result_args *args = arg;
if (args->cancelled) {
/* cancelled, do nothing */
} else if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
} else {
if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel,
cpa->subchannel_call_context, cpa->on_ready)) {
GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
if (pick_subchannel_locked(exec_ctx, args->elem)) {
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
}
}
gpr_free(cpa);
gpr_free(args);
}
static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
@ -1059,12 +1007,19 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
&calld->connected_subchannel,
GRPC_ERROR_REF(error));
}
for (grpc_closure *closure = chand->waiting_for_config_closures.head;
// If we don't yet have a resolver result, then a closure for
// continue_picking_after_resolver_result_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
// cancelled, so that when continue_picking_after_resolver_result_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
closure != NULL; closure = closure->next_data.next) {
continue_picking_args *cpa = closure->cb_arg;
if (cpa->connected_subchannel == &calld->connected_subchannel) {
cpa->connected_subchannel = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready,
pick_after_resolver_result_args *args = closure->cb_arg;
if (!args->cancelled && args->elem == elem) {
args->cancelled = true;
subchannel_ready_locked(exec_ctx, elem,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
}
@ -1072,26 +1027,65 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GRPC_ERROR_UNREF(error);
}
static bool pick_subchannel_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_call_context_element *subchannel_call_context,
grpc_closure *on_ready) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
// State for pick callback that holds a reference to the LB policy
// from which the pick was requested.
typedef struct {
grpc_lb_policy *lb_policy;
grpc_call_element *elem;
grpc_closure closure;
} pick_callback_args;
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
pick_callback_args *args = arg;
GPR_ASSERT(args != NULL);
GPR_ASSERT(args->lb_policy != NULL);
subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel");
gpr_free(args);
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
// If the pick was completed synchronously, unrefs the LB policy and
// returns true.
static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_lb_policy_pick_args *inputs) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args));
GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
pick_args->lb_policy = chand->lb_policy;
pick_args->elem = elem;
GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args,
grpc_combiner_scheduler(chand->combiner));
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
calld->subchannel_call_context, NULL, &pick_args->closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel");
gpr_free(pick_args);
}
return pick_done;
}
GPR_ASSERT(connected_subchannel);
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
bool pick_done = false;
if (chand->lb_policy != NULL) {
apply_final_configuration_locked(exec_ctx, elem);
grpc_lb_policy *lb_policy = chand->lb_policy;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
apply_service_config_to_call_locked(exec_ctx, elem);
// If the application explicitly set wait_for_ready, use that.
// Otherwise, if the service config specified a value for this
// method, use that.
uint32_t initial_metadata_flags =
calld->initial_metadata_payload->send_initial_metadata
.send_initial_metadata_flags;
const bool wait_for_ready_set_from_api =
initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
@ -1107,78 +1101,57 @@ static bool pick_subchannel_locked(
}
}
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem};
// Wrap the user-provided callback in order to hold a strong reference to
// the LB policy for the duration of the pick.
wrapped_on_pick_closure_arg *w_on_pick_arg =
gpr_zalloc(sizeof(*w_on_pick_arg));
GRPC_CLOSURE_INIT(&w_on_pick_arg->wrapper_closure,
wrapped_on_pick_closure_cb, w_on_pick_arg,
grpc_schedule_on_exec_ctx);
w_on_pick_arg->wrapped_closure = on_ready;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
w_on_pick_arg->lb_policy = lb_policy;
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel,
subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
"pick_subchannel_wrapping");
gpr_free(w_on_pick_arg);
}
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
calld->initial_metadata_payload->send_initial_metadata
.send_initial_metadata,
initial_metadata_flags, &calld->lb_token_mdelem};
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
} else if (chand->resolver != NULL) {
if (!chand->started_resolving) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver,
&chand->resolver_result,
&chand->on_resolver_result_changed);
}
if (chand->resolver != NULL) {
continue_picking_args *cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->subchannel_call_context = subchannel_call_context;
cpa->on_ready = on_ready;
cpa->elem = elem;
GRPC_CLOSURE_INIT(&cpa->closure, continue_picking_locked, cpa,
pick_after_resolver_result_args *args =
(pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
args->elem = elem;
GRPC_CLOSURE_INIT(&args->closure,
continue_picking_after_resolver_result_locked, args,
grpc_combiner_scheduler(chand->combiner));
grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
GRPC_ERROR_NONE);
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
&args->closure, GRPC_ERROR_NONE);
} else {
GRPC_CLOSURE_SCHED(exec_ctx, on_ready,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
subchannel_ready_locked(
exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
}
GPR_TIMER_END("pick_subchannel", 0);
return false;
return pick_done;
}
static void start_transport_stream_op_batch_locked_inner(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
grpc_transport_stream_op_batch *op = arg;
grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(coe.error));
/* early out */
return;
goto done;
}
if (coe.subchannel_call != NULL) {
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
/* early out */
return;
goto done;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
waiting_for_pick_batches_add_locked(calld, op);
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_stream) {
grpc_error *error = op->payload->cancel_stream.cancel_error;
@ -1190,30 +1163,22 @@ static void start_transport_stream_op_batch_locked_inner(
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
if (calld->pick_pending) {
cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
} else {
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op,
waiting_for_pick_batches_fail_locked(exec_ctx, calld,
GRPC_ERROR_REF(error));
/* early out */
return;
goto done;
}
/* if we don't have a subchannel, try to get one */
if (!calld->pick_pending && calld->connected_subchannel == NULL &&
op->send_initial_metadata) {
calld->initial_metadata_payload = op->payload;
calld->pick_pending = true;
GRPC_CLOSURE_INIT(&calld->next_step, subchannel_ready_locked, elem,
grpc_combiner_scheduler(chand->combiner));
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel_locked(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
&calld->connected_subchannel, calld->subchannel_call_context,
&calld->next_step)) {
if (pick_subchannel_locked(exec_ctx, elem)) {
// Pick was returned synchronously.
calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
@ -1221,42 +1186,20 @@ static void start_transport_stream_op_batch_locked_inner(
"Call dropped by load balancing policy");
set_call_or_error(calld,
(call_or_error){.error = GRPC_ERROR_REF(error)});
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return; // Early out.
waiting_for_pick_batches_fail_locked(exec_ctx, calld, error);
} else {
// Create subchannel call.
create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (!calld->pick_pending && calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena,
.context = calld->subchannel_call_context};
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
GPR_ASSERT(set_call_or_error(
calld, (call_or_error){.subchannel_call = subchannel_call}));
if (error != GRPC_ERROR_NONE) {
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
} else {
retry_waiting_locked(exec_ctx, calld);
/* recurse to retry */
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
}
/* early out */
return;
}
/* nothing to be done but wait */
add_waiting_locked(calld, op);
done:
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
"start_transport_stream_op_batch");
GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@ -1279,30 +1222,6 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
grpc_transport_stream_op_batch *op = arg;
grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
if (op->recv_trailing_metadata) {
GPR_ASSERT(op->on_complete != NULL);
calld->original_on_complete = op->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
grpc_schedule_on_exec_ctx);
op->on_complete = &calld->on_complete;
}
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
"start_transport_stream_op_batch");
GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
/* The logic here is fairly complicated, due to (a) the fact that we
need to handle the case where we receive the send op before the
initial metadata op, and (b) the need for efficiency, especially in
@ -1321,6 +1240,15 @@ static void cc_start_transport_stream_op_batch(
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
op);
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
if (op->recv_trailing_metadata) {
GPR_ASSERT(op->on_complete != NULL);
calld->original_on_complete = op->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
grpc_schedule_on_exec_ctx);
op->on_complete = &calld->on_complete;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
@ -1390,7 +1318,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
"client_channel_destroy_call");
}
GPR_ASSERT(!calld->pick_pending);
GPR_ASSERT(calld->waiting_ops_count == 0);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
"picked");
@ -1401,7 +1329,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
calld->subchannel_call_context[i].value);
}
}
gpr_free(calld->waiting_ops);
GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}

@ -74,7 +74,9 @@ static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx,
fake_resolver* r = (fake_resolver*)resolver;
if (r->next_completion != NULL) {
*r->target_result = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(
exec_ctx, r->next_completion,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown"));
r->next_completion = NULL;
}
}

@ -73,7 +73,9 @@ static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
if (r->next_completion != NULL) {
*r->target_result = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(
exec_ctx, r->next_completion,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown"));
r->next_completion = NULL;
}
}

Loading…
Cancel
Save