|
|
|
@ -76,29 +76,29 @@ typedef struct { |
|
|
|
|
wait_for_ready_value wait_for_ready; |
|
|
|
|
} method_parameters; |
|
|
|
|
|
|
|
|
|
static method_parameters *method_parameters_ref( |
|
|
|
|
method_parameters *method_params) { |
|
|
|
|
static method_parameters* method_parameters_ref( |
|
|
|
|
method_parameters* method_params) { |
|
|
|
|
gpr_ref(&method_params->refs); |
|
|
|
|
return method_params; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void method_parameters_unref(method_parameters *method_params) { |
|
|
|
|
static void method_parameters_unref(method_parameters* method_params) { |
|
|
|
|
if (gpr_unref(&method_params->refs)) { |
|
|
|
|
gpr_free(method_params); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Wrappers to pass to grpc_service_config_create_method_config_table().
|
|
|
|
|
static void *method_parameters_ref_wrapper(void *value) { |
|
|
|
|
return method_parameters_ref((method_parameters *)value); |
|
|
|
|
static void* method_parameters_ref_wrapper(void* value) { |
|
|
|
|
return method_parameters_ref((method_parameters*)value); |
|
|
|
|
} |
|
|
|
|
static void method_parameters_unref_wrapper(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *value) { |
|
|
|
|
method_parameters_unref((method_parameters *)value); |
|
|
|
|
static void method_parameters_unref_wrapper(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* value) { |
|
|
|
|
method_parameters_unref((method_parameters*)value); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool parse_wait_for_ready(grpc_json *field, |
|
|
|
|
wait_for_ready_value *wait_for_ready) { |
|
|
|
|
static bool parse_wait_for_ready(grpc_json* field, |
|
|
|
|
wait_for_ready_value* wait_for_ready) { |
|
|
|
|
if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -107,13 +107,13 @@ static bool parse_wait_for_ready(grpc_json *field, |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { |
|
|
|
|
static bool parse_timeout(grpc_json* field, grpc_millis* timeout) { |
|
|
|
|
if (field->type != GRPC_JSON_STRING) return false; |
|
|
|
|
size_t len = strlen(field->value); |
|
|
|
|
if (field->value[len - 1] != 's') return false; |
|
|
|
|
char *buf = gpr_strdup(field->value); |
|
|
|
|
char* buf = gpr_strdup(field->value); |
|
|
|
|
buf[len - 1] = '\0'; // Remove trailing 's'.
|
|
|
|
|
char *decimal_point = strchr(buf, '.'); |
|
|
|
|
char* decimal_point = strchr(buf, '.'); |
|
|
|
|
int nanos = 0; |
|
|
|
|
if (decimal_point != NULL) { |
|
|
|
|
*decimal_point = '\0'; |
|
|
|
@ -138,10 +138,10 @@ static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void *method_parameters_create_from_json(const grpc_json *json) { |
|
|
|
|
static void* method_parameters_create_from_json(const grpc_json* json) { |
|
|
|
|
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; |
|
|
|
|
grpc_millis timeout = 0; |
|
|
|
|
for (grpc_json *field = json->child; field != NULL; field = field->next) { |
|
|
|
|
for (grpc_json* field = json->child; field != NULL; field = field->next) { |
|
|
|
|
if (field->key == NULL) continue; |
|
|
|
|
if (strcmp(field->key, "waitForReady") == 0) { |
|
|
|
|
if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
|
|
|
|
@ -151,8 +151,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { |
|
|
|
|
if (!parse_timeout(field, &timeout)) return NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
method_parameters *value = |
|
|
|
|
(method_parameters *)gpr_malloc(sizeof(method_parameters)); |
|
|
|
|
method_parameters* value = |
|
|
|
|
(method_parameters*)gpr_malloc(sizeof(method_parameters)); |
|
|
|
|
gpr_ref_init(&value->refs, 1); |
|
|
|
|
value->timeout = timeout; |
|
|
|
|
value->wait_for_ready = wait_for_ready; |
|
|
|
@ -167,24 +167,24 @@ struct external_connectivity_watcher; |
|
|
|
|
|
|
|
|
|
typedef struct client_channel_channel_data { |
|
|
|
|
/** resolver for this channel */ |
|
|
|
|
grpc_resolver *resolver; |
|
|
|
|
grpc_resolver* resolver; |
|
|
|
|
/** have we started resolving this channel */ |
|
|
|
|
bool started_resolving; |
|
|
|
|
/** is deadline checking enabled? */ |
|
|
|
|
bool deadline_checking_enabled; |
|
|
|
|
/** client channel factory */ |
|
|
|
|
grpc_client_channel_factory *client_channel_factory; |
|
|
|
|
grpc_client_channel_factory* client_channel_factory; |
|
|
|
|
|
|
|
|
|
/** combiner protecting all variables below in this data structure */ |
|
|
|
|
grpc_combiner *combiner; |
|
|
|
|
grpc_combiner* combiner; |
|
|
|
|
/** currently active load balancer */ |
|
|
|
|
grpc_lb_policy *lb_policy; |
|
|
|
|
grpc_lb_policy* lb_policy; |
|
|
|
|
/** retry throttle data */ |
|
|
|
|
grpc_server_retry_throttle_data *retry_throttle_data; |
|
|
|
|
grpc_server_retry_throttle_data* retry_throttle_data; |
|
|
|
|
/** maps method names to method_parameters structs */ |
|
|
|
|
grpc_slice_hash_table *method_params_table; |
|
|
|
|
grpc_slice_hash_table* method_params_table; |
|
|
|
|
/** incoming resolver result - set by resolver.next() */ |
|
|
|
|
grpc_channel_args *resolver_result; |
|
|
|
|
grpc_channel_args* resolver_result; |
|
|
|
|
/** a list of closures that are all waiting for resolver result to come in */ |
|
|
|
|
grpc_closure_list waiting_for_resolver_result_closures; |
|
|
|
|
/** resolver callback */ |
|
|
|
@ -194,42 +194,42 @@ typedef struct client_channel_channel_data { |
|
|
|
|
/** when an lb_policy arrives, should we try to exit idle */ |
|
|
|
|
bool exit_idle_when_lb_policy_arrives; |
|
|
|
|
/** owning stack */ |
|
|
|
|
grpc_channel_stack *owning_stack; |
|
|
|
|
grpc_channel_stack* owning_stack; |
|
|
|
|
/** interested parties (owned) */ |
|
|
|
|
grpc_pollset_set *interested_parties; |
|
|
|
|
grpc_pollset_set* interested_parties; |
|
|
|
|
|
|
|
|
|
/* external_connectivity_watcher_list head is guarded by its own mutex, since
|
|
|
|
|
* counts need to be grabbed immediately without polling on a cq */ |
|
|
|
|
gpr_mu external_connectivity_watcher_list_mu; |
|
|
|
|
struct external_connectivity_watcher *external_connectivity_watcher_list_head; |
|
|
|
|
struct external_connectivity_watcher* external_connectivity_watcher_list_head; |
|
|
|
|
|
|
|
|
|
/* the following properties are guarded by a mutex since API's require them
|
|
|
|
|
to be instantaneously available */ |
|
|
|
|
gpr_mu info_mu; |
|
|
|
|
char *info_lb_policy_name; |
|
|
|
|
char* info_lb_policy_name; |
|
|
|
|
/** service config in JSON form */ |
|
|
|
|
char *info_service_config_json; |
|
|
|
|
char* info_service_config_json; |
|
|
|
|
} channel_data; |
|
|
|
|
|
|
|
|
|
/** We create one watcher for each new lb_policy that is returned from a
|
|
|
|
|
resolver, to watch for state changes from the lb_policy. When a state |
|
|
|
|
change is seen, we update the channel, and create a new watcher. */ |
|
|
|
|
typedef struct { |
|
|
|
|
channel_data *chand; |
|
|
|
|
channel_data* chand; |
|
|
|
|
grpc_closure on_changed; |
|
|
|
|
grpc_connectivity_state state; |
|
|
|
|
grpc_lb_policy *lb_policy; |
|
|
|
|
grpc_lb_policy* lb_policy; |
|
|
|
|
} lb_policy_connectivity_watcher; |
|
|
|
|
|
|
|
|
|
static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, |
|
|
|
|
grpc_lb_policy *lb_policy, |
|
|
|
|
static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, |
|
|
|
|
grpc_lb_policy* lb_policy, |
|
|
|
|
grpc_connectivity_state current_state); |
|
|
|
|
|
|
|
|
|
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
channel_data *chand, |
|
|
|
|
static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
channel_data* chand, |
|
|
|
|
grpc_connectivity_state state, |
|
|
|
|
grpc_error *error, |
|
|
|
|
const char *reason) { |
|
|
|
|
grpc_error* error, |
|
|
|
|
const char* reason) { |
|
|
|
|
/* TODO: Improve failure handling:
|
|
|
|
|
* - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. |
|
|
|
|
* - Hand over pending picks from old policies during the switch that happens |
|
|
|
@ -256,9 +256,9 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
lb_policy_connectivity_watcher *w = (lb_policy_connectivity_watcher *)arg; |
|
|
|
|
static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg; |
|
|
|
|
grpc_connectivity_state publish_state = w->state; |
|
|
|
|
/* check if the notification is for the latest policy */ |
|
|
|
|
if (w->lb_policy == w->chand->lb_policy) { |
|
|
|
@ -282,11 +282,11 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_free(w); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, |
|
|
|
|
grpc_lb_policy *lb_policy, |
|
|
|
|
static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, |
|
|
|
|
grpc_lb_policy* lb_policy, |
|
|
|
|
grpc_connectivity_state current_state) { |
|
|
|
|
lb_policy_connectivity_watcher *w = |
|
|
|
|
(lb_policy_connectivity_watcher *)gpr_malloc(sizeof(*w)); |
|
|
|
|
lb_policy_connectivity_watcher* w = |
|
|
|
|
(lb_policy_connectivity_watcher*)gpr_malloc(sizeof(*w)); |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); |
|
|
|
|
w->chand = chand; |
|
|
|
|
GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, |
|
|
|
@ -297,8 +297,8 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, |
|
|
|
|
&w->on_changed); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void start_resolving_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
channel_data *chand) { |
|
|
|
|
static void start_resolving_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
channel_data* chand) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); |
|
|
|
|
} |
|
|
|
@ -310,19 +310,19 @@ static void start_resolving_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
char *server_name; |
|
|
|
|
grpc_server_retry_throttle_data *retry_throttle_data; |
|
|
|
|
char* server_name; |
|
|
|
|
grpc_server_retry_throttle_data* retry_throttle_data; |
|
|
|
|
} service_config_parsing_state; |
|
|
|
|
|
|
|
|
|
static void parse_retry_throttle_params(const grpc_json *field, void *arg) { |
|
|
|
|
service_config_parsing_state *parsing_state = |
|
|
|
|
(service_config_parsing_state *)arg; |
|
|
|
|
static void parse_retry_throttle_params(const grpc_json* field, void* arg) { |
|
|
|
|
service_config_parsing_state* parsing_state = |
|
|
|
|
(service_config_parsing_state*)arg; |
|
|
|
|
if (strcmp(field->key, "retryThrottling") == 0) { |
|
|
|
|
if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
|
|
|
|
|
if (field->type != GRPC_JSON_OBJECT) return; |
|
|
|
|
int max_milli_tokens = 0; |
|
|
|
|
int milli_token_ratio = 0; |
|
|
|
|
for (grpc_json *sub_field = field->child; sub_field != NULL; |
|
|
|
|
for (grpc_json* sub_field = field->child; sub_field != NULL; |
|
|
|
|
sub_field = sub_field->next) { |
|
|
|
|
if (sub_field->key == NULL) return; |
|
|
|
|
if (strcmp(sub_field->key, "maxTokens") == 0) { |
|
|
|
@ -338,7 +338,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { |
|
|
|
|
size_t whole_len = strlen(sub_field->value); |
|
|
|
|
uint32_t multiplier = 1; |
|
|
|
|
uint32_t decimal_value = 0; |
|
|
|
|
const char *decimal_point = strchr(sub_field->value, '.'); |
|
|
|
|
const char* decimal_point = strchr(sub_field->value, '.'); |
|
|
|
|
if (decimal_point != NULL) { |
|
|
|
|
whole_len = (size_t)(decimal_point - sub_field->value); |
|
|
|
|
multiplier = 1000; |
|
|
|
@ -369,25 +369,25 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
channel_data *chand = (channel_data *)arg; |
|
|
|
|
static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
channel_data* chand = (channel_data*)arg; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
|
// Extract the following fields from the resolver result, if non-NULL.
|
|
|
|
|
bool lb_policy_updated = false; |
|
|
|
|
char *lb_policy_name_dup = NULL; |
|
|
|
|
char* lb_policy_name_dup = NULL; |
|
|
|
|
bool lb_policy_name_changed = false; |
|
|
|
|
grpc_lb_policy *new_lb_policy = NULL; |
|
|
|
|
char *service_config_json = NULL; |
|
|
|
|
grpc_server_retry_throttle_data *retry_throttle_data = NULL; |
|
|
|
|
grpc_slice_hash_table *method_params_table = NULL; |
|
|
|
|
grpc_lb_policy* new_lb_policy = NULL; |
|
|
|
|
char* service_config_json = NULL; |
|
|
|
|
grpc_server_retry_throttle_data* retry_throttle_data = NULL; |
|
|
|
|
grpc_slice_hash_table* method_params_table = NULL; |
|
|
|
|
if (chand->resolver_result != NULL) { |
|
|
|
|
// Find LB policy name.
|
|
|
|
|
const char *lb_policy_name = NULL; |
|
|
|
|
const grpc_arg *channel_arg = |
|
|
|
|
const char* lb_policy_name = NULL; |
|
|
|
|
const grpc_arg* channel_arg = |
|
|
|
|
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); |
|
|
|
|
if (channel_arg != NULL) { |
|
|
|
|
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); |
|
|
|
@ -398,8 +398,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
channel_arg = |
|
|
|
|
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); |
|
|
|
|
if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) { |
|
|
|
|
grpc_lb_addresses *addresses = |
|
|
|
|
(grpc_lb_addresses *)channel_arg->value.pointer.p; |
|
|
|
|
grpc_lb_addresses* addresses = |
|
|
|
|
(grpc_lb_addresses*)channel_arg->value.pointer.p; |
|
|
|
|
bool found_balancer_address = false; |
|
|
|
|
for (size_t i = 0; i < addresses->num_addresses; ++i) { |
|
|
|
|
if (addresses->addresses[i].is_balancer) { |
|
|
|
@ -450,14 +450,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
if (channel_arg != NULL) { |
|
|
|
|
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); |
|
|
|
|
service_config_json = gpr_strdup(channel_arg->value.string); |
|
|
|
|
grpc_service_config *service_config = |
|
|
|
|
grpc_service_config* service_config = |
|
|
|
|
grpc_service_config_create(service_config_json); |
|
|
|
|
if (service_config != NULL) { |
|
|
|
|
channel_arg = |
|
|
|
|
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); |
|
|
|
|
GPR_ASSERT(channel_arg != NULL); |
|
|
|
|
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); |
|
|
|
|
grpc_uri *uri = |
|
|
|
|
grpc_uri* uri = |
|
|
|
|
grpc_uri_parse(exec_ctx, channel_arg->value.string, true); |
|
|
|
|
GPR_ASSERT(uri->path[0] != '\0'); |
|
|
|
|
service_config_parsing_state parsing_state; |
|
|
|
@ -560,7 +560,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
&chand->waiting_for_resolver_result_closures); |
|
|
|
|
} else { // Not shutting down.
|
|
|
|
|
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
|
|
|
|
grpc_error *state_error = |
|
|
|
|
grpc_error* state_error = |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); |
|
|
|
|
if (new_lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
@ -592,12 +592,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
grpc_transport_op *op = (grpc_transport_op *)arg; |
|
|
|
|
grpc_channel_element *elem = |
|
|
|
|
(grpc_channel_element *)op->handler_private.extra_arg; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error_ignored) { |
|
|
|
|
grpc_transport_op* op = (grpc_transport_op*)arg; |
|
|
|
|
grpc_channel_element* elem = |
|
|
|
|
(grpc_channel_element*)op->handler_private.extra_arg; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
|
|
|
|
|
if (op->on_connectivity_state_change != NULL) { |
|
|
|
|
grpc_connectivity_state_notify_on_state_change( |
|
|
|
@ -648,10 +648,10 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_channel_element *elem, |
|
|
|
|
grpc_transport_op *op) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void cc_start_transport_op(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_channel_element* elem, |
|
|
|
|
grpc_transport_op* op) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(op->set_accept_stream == false); |
|
|
|
|
if (op->bind_pollset != NULL) { |
|
|
|
@ -668,10 +668,10 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_channel_element *elem, |
|
|
|
|
const grpc_channel_info *info) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void cc_get_channel_info(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_channel_element* elem, |
|
|
|
|
const grpc_channel_info* info) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
gpr_mu_lock(&chand->info_mu); |
|
|
|
|
if (info->lb_policy_name != NULL) { |
|
|
|
|
*info->lb_policy_name = chand->info_lb_policy_name == NULL |
|
|
|
@ -688,10 +688,10 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Constructor for channel_data */ |
|
|
|
|
static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_channel_element *elem, |
|
|
|
|
grpc_channel_element_args *args) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_channel_element* elem, |
|
|
|
|
grpc_channel_element_args* args) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
GPR_ASSERT(args->is_last); |
|
|
|
|
GPR_ASSERT(elem->filter == &grpc_client_channel_filter); |
|
|
|
|
// Initialize data members.
|
|
|
|
@ -712,7 +712,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
"client_channel"); |
|
|
|
|
grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties); |
|
|
|
|
// Record client channel factory.
|
|
|
|
|
const grpc_arg *arg = grpc_channel_args_find(args->channel_args, |
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(args->channel_args, |
|
|
|
|
GRPC_ARG_CLIENT_CHANNEL_FACTORY); |
|
|
|
|
if (arg == NULL) { |
|
|
|
|
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
@ -723,9 +723,9 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
"client channel factory arg must be a pointer"); |
|
|
|
|
} |
|
|
|
|
grpc_client_channel_factory_ref( |
|
|
|
|
(grpc_client_channel_factory *)arg->value.pointer.p); |
|
|
|
|
(grpc_client_channel_factory*)arg->value.pointer.p); |
|
|
|
|
chand->client_channel_factory = |
|
|
|
|
(grpc_client_channel_factory *)arg->value.pointer.p; |
|
|
|
|
(grpc_client_channel_factory*)arg->value.pointer.p; |
|
|
|
|
// Get server name to resolve, using proxy mapper if needed.
|
|
|
|
|
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); |
|
|
|
|
if (arg == NULL) { |
|
|
|
@ -736,8 +736,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"server uri arg must be a string"); |
|
|
|
|
} |
|
|
|
|
char *proxy_name = NULL; |
|
|
|
|
grpc_channel_args *new_args = NULL; |
|
|
|
|
char* proxy_name = NULL; |
|
|
|
|
grpc_channel_args* new_args = NULL; |
|
|
|
|
grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args, |
|
|
|
|
&proxy_name, &new_args); |
|
|
|
|
// Instantiate resolver.
|
|
|
|
@ -755,21 +755,22 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_resolver *resolver = (grpc_resolver *)arg; |
|
|
|
|
static void shutdown_resolver_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
grpc_resolver* resolver = (grpc_resolver*)arg; |
|
|
|
|
grpc_resolver_shutdown_locked(exec_ctx, resolver); |
|
|
|
|
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Destructor for channel_data */ |
|
|
|
|
static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_channel_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void cc_destroy_channel_elem(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_channel_element* elem) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
if (chand->resolver != NULL) { |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)), |
|
|
|
|
exec_ctx, |
|
|
|
|
GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
if (chand->client_channel_factory != NULL) { |
|
|
|
@ -829,45 +830,45 @@ typedef struct client_channel_call_data { |
|
|
|
|
grpc_slice path; // Request path.
|
|
|
|
|
gpr_timespec call_start_time; |
|
|
|
|
grpc_millis deadline; |
|
|
|
|
gpr_arena *arena; |
|
|
|
|
grpc_call_stack *owning_call; |
|
|
|
|
grpc_call_combiner *call_combiner; |
|
|
|
|
gpr_arena* arena; |
|
|
|
|
grpc_call_stack* owning_call; |
|
|
|
|
grpc_call_combiner* call_combiner; |
|
|
|
|
|
|
|
|
|
grpc_server_retry_throttle_data *retry_throttle_data; |
|
|
|
|
method_parameters *method_params; |
|
|
|
|
grpc_server_retry_throttle_data* retry_throttle_data; |
|
|
|
|
method_parameters* method_params; |
|
|
|
|
|
|
|
|
|
grpc_subchannel_call *subchannel_call; |
|
|
|
|
grpc_error *error; |
|
|
|
|
grpc_subchannel_call* subchannel_call; |
|
|
|
|
grpc_error* error; |
|
|
|
|
|
|
|
|
|
grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
|
|
|
|
|
grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending.
|
|
|
|
|
grpc_closure lb_pick_closure; |
|
|
|
|
grpc_closure lb_pick_cancel_closure; |
|
|
|
|
|
|
|
|
|
grpc_connected_subchannel *connected_subchannel; |
|
|
|
|
grpc_connected_subchannel* connected_subchannel; |
|
|
|
|
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; |
|
|
|
|
grpc_polling_entity *pollent; |
|
|
|
|
grpc_polling_entity* pollent; |
|
|
|
|
|
|
|
|
|
grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES]; |
|
|
|
|
grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; |
|
|
|
|
size_t waiting_for_pick_batches_count; |
|
|
|
|
grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES]; |
|
|
|
|
|
|
|
|
|
grpc_transport_stream_op_batch *initial_metadata_batch; |
|
|
|
|
grpc_transport_stream_op_batch* initial_metadata_batch; |
|
|
|
|
|
|
|
|
|
grpc_linked_mdelem lb_token_mdelem; |
|
|
|
|
|
|
|
|
|
grpc_closure on_complete; |
|
|
|
|
grpc_closure *original_on_complete; |
|
|
|
|
grpc_closure* original_on_complete; |
|
|
|
|
} call_data; |
|
|
|
|
|
|
|
|
|
grpc_subchannel_call *grpc_client_channel_get_subchannel_call( |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
grpc_subchannel_call* grpc_client_channel_get_subchannel_call( |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
return calld->subchannel_call; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is called via the call combiner, so access to calld is synchronized.
|
|
|
|
|
static void waiting_for_pick_batches_add( |
|
|
|
|
call_data *calld, grpc_transport_stream_op_batch *batch) { |
|
|
|
|
call_data* calld, grpc_transport_stream_op_batch* batch) { |
|
|
|
|
if (batch->send_initial_metadata) { |
|
|
|
|
GPR_ASSERT(calld->initial_metadata_batch == NULL); |
|
|
|
|
calld->initial_metadata_batch = batch; |
|
|
|
@ -879,9 +880,9 @@ static void waiting_for_pick_batches_add( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is called via the call combiner, so access to calld is synchronized.
|
|
|
|
|
static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
call_data *calld = (call_data *)arg; |
|
|
|
|
static void fail_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
call_data* calld = (call_data*)arg; |
|
|
|
|
if (calld->waiting_for_pick_batches_count > 0) { |
|
|
|
|
--calld->waiting_for_pick_batches_count; |
|
|
|
|
grpc_transport_stream_op_batch_finish_with_failure( |
|
|
|
@ -892,10 +893,10 @@ static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is called via the call combiner, so access to calld is synchronized.
|
|
|
|
|
static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void waiting_for_pick_batches_fail(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", |
|
|
|
@ -923,9 +924,9 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is called via the call combiner, so access to calld is synchronized.
|
|
|
|
|
static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *ignored) { |
|
|
|
|
call_data *calld = (call_data *)arg; |
|
|
|
|
static void run_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, grpc_error* ignored) { |
|
|
|
|
call_data* calld = (call_data*)arg; |
|
|
|
|
if (calld->waiting_for_pick_batches_count > 0) { |
|
|
|
|
--calld->waiting_for_pick_batches_count; |
|
|
|
|
grpc_subchannel_call_process_op( |
|
|
|
@ -935,13 +936,14 @@ static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is called via the call combiner, so access to calld is synchronized.
|
|
|
|
|
static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void waiting_for_pick_batches_resume(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIuPTR |
|
|
|
|
" pending batches to subchannel_call=%p", |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: sending %" PRIuPTR |
|
|
|
|
" pending batches to subchannel_call=%p", |
|
|
|
|
chand, calld, calld->waiting_for_pick_batches_count, |
|
|
|
|
calld->subchannel_call); |
|
|
|
|
} |
|
|
|
@ -961,10 +963,10 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
// Applies service config to the call. Must be invoked once we know
|
|
|
|
|
// that the resolver has returned results to the channel.
|
|
|
|
|
static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", |
|
|
|
|
chand, calld); |
|
|
|
@ -974,7 +976,7 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); |
|
|
|
|
} |
|
|
|
|
if (chand->method_params_table != NULL) { |
|
|
|
|
calld->method_params = (method_parameters *)grpc_method_config_table_get( |
|
|
|
|
calld->method_params = (method_parameters*)grpc_method_config_table_get( |
|
|
|
|
exec_ctx, chand->method_params_table, calld->path); |
|
|
|
|
if (calld->method_params != NULL) { |
|
|
|
|
method_parameters_ref(calld->method_params); |
|
|
|
@ -994,11 +996,11 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void create_subchannel_call_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
const grpc_connected_subchannel_call_args call_args = { |
|
|
|
|
calld->pollent, // pollent
|
|
|
|
|
calld->path, // path
|
|
|
|
@ -1008,7 +1010,7 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
calld->subchannel_call_context, // context
|
|
|
|
|
calld->call_combiner // call_combiner
|
|
|
|
|
}; |
|
|
|
|
grpc_error *new_error = grpc_connected_subchannel_create_call( |
|
|
|
|
grpc_error* new_error = grpc_connected_subchannel_create_call( |
|
|
|
|
exec_ctx, calld->connected_subchannel, &call_args, |
|
|
|
|
&calld->subchannel_call); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
@ -1025,10 +1027,10 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Invoked when a pick is completed, on both success or failure.
|
|
|
|
|
static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
if (calld->connected_subchannel == NULL) { |
|
|
|
|
// Failed to create subchannel.
|
|
|
|
|
GRPC_ERROR_UNREF(calld->error); |
|
|
|
@ -1054,10 +1056,10 @@ static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
// either (a) the pick was deferred pending a resolver result or (b) the
|
|
|
|
|
// pick was done asynchronously. Removes the call's polling entity from
|
|
|
|
|
// chand->interested_parties before invoking pick_done_locked().
|
|
|
|
|
static void async_pick_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, grpc_error *error) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void async_pick_done_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, grpc_error* error) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, |
|
|
|
|
chand->interested_parties); |
|
|
|
|
pick_done_locked(exec_ctx, elem, error); |
|
|
|
@ -1065,11 +1067,11 @@ static void async_pick_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
// Note: This runs under the client_channel combiner, but will NOT be
|
|
|
|
|
// holding the call combiner.
|
|
|
|
|
static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_call_element *elem = (grpc_call_element *)arg; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void pick_callback_cancel_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
grpc_call_element* elem = (grpc_call_element*)arg; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (calld->lb_policy != NULL) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", |
|
|
|
@ -1084,11 +1086,11 @@ static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
|
|
|
|
|
// Unrefs the LB policy and invokes async_pick_done_locked().
|
|
|
|
|
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_call_element *elem = (grpc_call_element *)arg; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void pick_callback_done_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
grpc_call_element* elem = (grpc_call_element*)arg; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", |
|
|
|
|
chand, calld); |
|
|
|
@ -1102,10 +1104,10 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
|
|
|
|
|
// If the pick was completed synchronously, unrefs the LB policy and
|
|
|
|
|
// returns true.
|
|
|
|
|
static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", |
|
|
|
|
chand, calld, chand->lb_policy); |
|
|
|
@ -1162,7 +1164,7 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
grpc_call_element *elem; |
|
|
|
|
grpc_call_element* elem; |
|
|
|
|
bool finished; |
|
|
|
|
grpc_closure closure; |
|
|
|
|
grpc_closure cancel_closure; |
|
|
|
@ -1170,11 +1172,10 @@ typedef struct { |
|
|
|
|
|
|
|
|
|
// Note: This runs under the client_channel combiner, but will NOT be
|
|
|
|
|
// holding the call combiner.
|
|
|
|
|
static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
pick_after_resolver_result_args *args = |
|
|
|
|
(pick_after_resolver_result_args *)arg; |
|
|
|
|
static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; |
|
|
|
|
if (args->finished) { |
|
|
|
|
gpr_free(args); |
|
|
|
|
return; |
|
|
|
@ -1187,9 +1188,9 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
// is called, it will be a no-op. We also immediately invoke
|
|
|
|
|
// async_pick_done_locked() to propagate the error back to the caller.
|
|
|
|
|
args->finished = true; |
|
|
|
|
grpc_call_element *elem = args->elem; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
grpc_call_element* elem = args->elem; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: cancelling pick waiting for resolver result", |
|
|
|
@ -1205,14 +1206,13 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
"Pick cancelled", &error, 1)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem); |
|
|
|
|
static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem); |
|
|
|
|
|
|
|
|
|
static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
pick_after_resolver_result_args *args = |
|
|
|
|
(pick_after_resolver_result_args *)arg; |
|
|
|
|
static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; |
|
|
|
|
if (args->finished) { |
|
|
|
|
/* cancelled, do nothing */ |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
@ -1222,9 +1222,9 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
args->finished = true; |
|
|
|
|
grpc_call_element *elem = args->elem; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
grpc_call_element* elem = args->elem; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", |
|
|
|
@ -1271,17 +1271,17 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"chand=%p calld=%p: deferring pick pending resolver result", chand, |
|
|
|
|
calld); |
|
|
|
|
} |
|
|
|
|
pick_after_resolver_result_args *args = |
|
|
|
|
(pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); |
|
|
|
|
pick_after_resolver_result_args* args = |
|
|
|
|
(pick_after_resolver_result_args*)gpr_zalloc(sizeof(*args)); |
|
|
|
|
args->elem = elem; |
|
|
|
|
GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, |
|
|
|
|
args, grpc_combiner_scheduler(chand->combiner)); |
|
|
|
@ -1294,11 +1294,11 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *ignored) { |
|
|
|
|
grpc_call_element *elem = (grpc_call_element *)arg; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void start_pick_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* ignored) { |
|
|
|
|
grpc_call_element* elem = (grpc_call_element*)arg; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
GPR_ASSERT(calld->connected_subchannel == NULL); |
|
|
|
|
if (chand->lb_policy != NULL) { |
|
|
|
|
// We already have an LB policy, so ask it for a pick.
|
|
|
|
@ -1328,9 +1328,9 @@ static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
chand->interested_parties); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
grpc_call_element *elem = (grpc_call_element *)arg; |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { |
|
|
|
|
grpc_call_element* elem = (grpc_call_element*)arg; |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
if (calld->retry_throttle_data != NULL) { |
|
|
|
|
if (error == GRPC_ERROR_NONE) { |
|
|
|
|
grpc_server_retry_throttle_data_record_success( |
|
|
|
@ -1349,10 +1349,10 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_start_transport_stream_op_batch( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_call_element *elem, |
|
|
|
|
grpc_transport_stream_op_batch *batch) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
grpc_exec_ctx* exec_ctx, grpc_call_element* elem, |
|
|
|
|
grpc_transport_stream_op_batch* batch) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
if (chand->deadline_checking_enabled) { |
|
|
|
|
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, |
|
|
|
|
batch); |
|
|
|
@ -1443,11 +1443,11 @@ done: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Constructor for call_data */ |
|
|
|
|
static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
const grpc_call_element_args *args) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static grpc_error* cc_init_call_elem(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, |
|
|
|
|
const grpc_call_element_args* args) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
// Initialize data members.
|
|
|
|
|
calld->path = grpc_slice_ref_internal(args->path); |
|
|
|
|
calld->call_start_time = args->start_time; |
|
|
|
@ -1463,12 +1463,12 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Destructor for call_data */ |
|
|
|
|
static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
const grpc_call_final_info *final_info, |
|
|
|
|
grpc_closure *then_schedule_closure) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, |
|
|
|
|
const grpc_call_final_info* final_info, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
if (chand->deadline_checking_enabled) { |
|
|
|
|
grpc_deadline_state_destroy(exec_ctx, elem); |
|
|
|
|
} |
|
|
|
@ -1499,10 +1499,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
grpc_polling_entity *pollent) { |
|
|
|
|
call_data *calld = (call_data *)elem->call_data; |
|
|
|
|
static void cc_set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_call_element* elem, |
|
|
|
|
grpc_polling_entity* pollent) { |
|
|
|
|
call_data* calld = (call_data*)elem->call_data; |
|
|
|
|
calld->pollent = pollent; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1524,9 +1524,9 @@ const grpc_channel_filter grpc_client_channel_filter = { |
|
|
|
|
"client-channel", |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
channel_data *chand = (channel_data *)arg; |
|
|
|
|
static void try_to_connect_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error_ignored) { |
|
|
|
|
channel_data* chand = (channel_data*)arg; |
|
|
|
|
if (chand->lb_policy != NULL) { |
|
|
|
|
grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy); |
|
|
|
|
} else { |
|
|
|
@ -1539,34 +1539,35 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_connectivity_state grpc_client_channel_check_connectivity_state( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, int try_to_connect) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
grpc_connectivity_state out = |
|
|
|
|
grpc_connectivity_state_check(&chand->state_tracker); |
|
|
|
|
if (out == GRPC_CHANNEL_IDLE && try_to_connect) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)), |
|
|
|
|
exec_ctx, |
|
|
|
|
GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
return out; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct external_connectivity_watcher { |
|
|
|
|
channel_data *chand; |
|
|
|
|
channel_data* chand; |
|
|
|
|
grpc_polling_entity pollent; |
|
|
|
|
grpc_closure *on_complete; |
|
|
|
|
grpc_closure *watcher_timer_init; |
|
|
|
|
grpc_connectivity_state *state; |
|
|
|
|
grpc_closure* on_complete; |
|
|
|
|
grpc_closure* watcher_timer_init; |
|
|
|
|
grpc_connectivity_state* state; |
|
|
|
|
grpc_closure my_closure; |
|
|
|
|
struct external_connectivity_watcher *next; |
|
|
|
|
struct external_connectivity_watcher* next; |
|
|
|
|
} external_connectivity_watcher; |
|
|
|
|
|
|
|
|
|
static external_connectivity_watcher *lookup_external_connectivity_watcher( |
|
|
|
|
channel_data *chand, grpc_closure *on_complete) { |
|
|
|
|
static external_connectivity_watcher* lookup_external_connectivity_watcher( |
|
|
|
|
channel_data* chand, grpc_closure* on_complete) { |
|
|
|
|
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
|
|
|
|
external_connectivity_watcher *w = |
|
|
|
|
external_connectivity_watcher* w = |
|
|
|
|
chand->external_connectivity_watcher_list_head; |
|
|
|
|
while (w != NULL && w->on_complete != on_complete) { |
|
|
|
|
w = w->next; |
|
|
|
@ -1576,7 +1577,7 @@ static external_connectivity_watcher *lookup_external_connectivity_watcher( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void external_connectivity_watcher_list_append( |
|
|
|
|
channel_data *chand, external_connectivity_watcher *w) { |
|
|
|
|
channel_data* chand, external_connectivity_watcher* w) { |
|
|
|
|
GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); |
|
|
|
@ -1587,7 +1588,7 @@ static void external_connectivity_watcher_list_append( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void external_connectivity_watcher_list_remove( |
|
|
|
|
channel_data *chand, external_connectivity_watcher *too_remove) { |
|
|
|
|
channel_data* chand, external_connectivity_watcher* too_remove) { |
|
|
|
|
GPR_ASSERT( |
|
|
|
|
lookup_external_connectivity_watcher(chand, too_remove->on_complete)); |
|
|
|
|
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
|
|
|
@ -1596,7 +1597,7 @@ static void external_connectivity_watcher_list_remove( |
|
|
|
|
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
external_connectivity_watcher *w = |
|
|
|
|
external_connectivity_watcher* w = |
|
|
|
|
chand->external_connectivity_watcher_list_head; |
|
|
|
|
while (w != NULL) { |
|
|
|
|
if (w->next == too_remove) { |
|
|
|
@ -1610,12 +1611,12 @@ static void external_connectivity_watcher_list_remove( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_client_channel_num_external_connectivity_watchers( |
|
|
|
|
grpc_channel_element *elem) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
grpc_channel_element* elem) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
int count = 0; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); |
|
|
|
|
external_connectivity_watcher *w = |
|
|
|
|
external_connectivity_watcher* w = |
|
|
|
|
chand->external_connectivity_watcher_list_head; |
|
|
|
|
while (w != NULL) { |
|
|
|
|
count++; |
|
|
|
@ -1626,10 +1627,10 @@ int grpc_client_channel_num_external_connectivity_watchers( |
|
|
|
|
return count; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_external_watch_complete_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
external_connectivity_watcher *w = (external_connectivity_watcher *)arg; |
|
|
|
|
grpc_closure *follow_up = w->on_complete; |
|
|
|
|
static void on_external_watch_complete_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
external_connectivity_watcher* w = (external_connectivity_watcher*)arg; |
|
|
|
|
grpc_closure* follow_up = w->on_complete; |
|
|
|
|
grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, |
|
|
|
|
w->chand->interested_parties); |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, |
|
|
|
@ -1639,10 +1640,10 @@ static void on_external_watch_complete_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
external_connectivity_watcher *w = (external_connectivity_watcher *)arg; |
|
|
|
|
external_connectivity_watcher *found = NULL; |
|
|
|
|
static void watch_connectivity_state_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error_ignored) { |
|
|
|
|
external_connectivity_watcher* w = (external_connectivity_watcher*)arg; |
|
|
|
|
external_connectivity_watcher* found = NULL; |
|
|
|
|
if (w->state != NULL) { |
|
|
|
|
external_connectivity_watcher_list_append(w->chand, w); |
|
|
|
|
GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); |
|
|
|
@ -1667,12 +1668,12 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_client_channel_watch_connectivity_state( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, |
|
|
|
|
grpc_polling_entity pollent, grpc_connectivity_state *state, |
|
|
|
|
grpc_closure *closure, grpc_closure *watcher_timer_init) { |
|
|
|
|
channel_data *chand = (channel_data *)elem->channel_data; |
|
|
|
|
external_connectivity_watcher *w = |
|
|
|
|
(external_connectivity_watcher *)gpr_zalloc(sizeof(*w)); |
|
|
|
|
grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, |
|
|
|
|
grpc_polling_entity pollent, grpc_connectivity_state* state, |
|
|
|
|
grpc_closure* closure, grpc_closure* watcher_timer_init) { |
|
|
|
|
channel_data* chand = (channel_data*)elem->channel_data; |
|
|
|
|
external_connectivity_watcher* w = |
|
|
|
|
(external_connectivity_watcher*)gpr_zalloc(sizeof(*w)); |
|
|
|
|
w->chand = chand; |
|
|
|
|
w->pollent = pollent; |
|
|
|
|
w->on_complete = closure; |
|
|
|
|