Clean up method parameters in client channel code.

pull/9869/head
Mark D. Roth 8 years ago
parent bf1eb38ced
commit 95b627b7ad
  1. 176
      src/core/ext/client_channel/client_channel.c

@ -76,50 +76,58 @@ typedef enum {
WAIT_FOR_READY_TRUE WAIT_FOR_READY_TRUE
} wait_for_ready_value; } wait_for_ready_value;
typedef struct method_parameters { typedef struct {
gpr_refcount refs;
gpr_timespec timeout; gpr_timespec timeout;
wait_for_ready_value wait_for_ready; wait_for_ready_value wait_for_ready;
} method_parameters; } method_parameters;
static method_parameters* method_parameters_ref(
method_parameters *method_params) {
gpr_ref(&method_params->refs);
return method_params;
}
static void method_parameters_unref(method_parameters *method_params) {
if (gpr_unref(&method_params->refs)) {
gpr_free(method_params);
}
}
static void *method_parameters_copy(void *value) { static void *method_parameters_copy(void *value) {
void *new_value = gpr_malloc(sizeof(method_parameters)); return method_parameters_ref(value);
memcpy(new_value, value, sizeof(method_parameters));
return new_value;
} }
static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
gpr_free(p); method_parameters_unref(value);
} }
static const grpc_slice_hash_table_vtable method_parameters_vtable = { static const grpc_slice_hash_table_vtable method_parameters_vtable = {
method_parameters_free, method_parameters_copy}; method_parameters_free, method_parameters_copy};
static void *method_parameters_create_from_json(const grpc_json *json) { static bool parse_wait_for_ready(grpc_json *field,
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; wait_for_ready_value *wait_for_ready) {
gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
for (grpc_json *field = json->child; field != NULL; field = field->next) {
if (field->key == NULL) continue;
if (strcmp(field->key, "waitForReady") == 0) {
if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
return NULL; return false;
} }
wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
: WAIT_FOR_READY_FALSE; : WAIT_FOR_READY_FALSE;
} else if (strcmp(field->key, "timeout") == 0) { return true;
if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. }
if (field->type != GRPC_JSON_STRING) return NULL;
static bool parse_timeout(grpc_json *field, gpr_timespec* timeout) {
if (field->type != GRPC_JSON_STRING) return false;
size_t len = strlen(field->value); size_t len = strlen(field->value);
if (field->value[len - 1] != 's') return NULL; if (field->value[len - 1] != 's') return false;
char *buf = gpr_strdup(field->value); char *buf = gpr_strdup(field->value);
buf[len - 1] = '\0'; // Remove trailing 's'. buf[len - 1] = '\0'; // Remove trailing 's'.
char *decimal_point = strchr(buf, '.'); char *decimal_point = strchr(buf, '.');
if (decimal_point != NULL) { if (decimal_point != NULL) {
*decimal_point = '\0'; *decimal_point = '\0';
timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
if (timeout.tv_nsec == -1) { if (timeout->tv_nsec == -1) {
gpr_free(buf); gpr_free(buf);
return NULL; return false;
} }
// There should always be exactly 3, 6, or 9 fractional digits. // There should always be exactly 3, 6, or 9 fractional digits.
int multiplier = 1; int multiplier = 1;
@ -134,16 +142,31 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
break; break;
default: // Unsupported number of digits. default: // Unsupported number of digits.
gpr_free(buf); gpr_free(buf);
return NULL; return false;
} }
timeout.tv_nsec *= multiplier; timeout->tv_nsec *= multiplier;
} }
timeout.tv_sec = gpr_parse_nonnegative_int(buf); timeout->tv_sec = gpr_parse_nonnegative_int(buf);
if (timeout.tv_sec == -1) return NULL;
gpr_free(buf); gpr_free(buf);
if (timeout->tv_sec == -1) return false;
return true;
}
static void *method_parameters_create_from_json(const grpc_json *json) {
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
for (grpc_json *field = json->child; field != NULL; field = field->next) {
if (field->key == NULL) continue;
if (strcmp(field->key, "waitForReady") == 0) {
if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
} else if (strcmp(field->key, "timeout") == 0) {
if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
if (!parse_timeout(field, &timeout)) return NULL;
} }
} }
method_parameters *value = gpr_malloc(sizeof(method_parameters)); method_parameters *value = gpr_malloc(sizeof(method_parameters));
gpr_ref_init(&value->refs, 1);
value->timeout = timeout; value->timeout = timeout;
value->wait_for_ready = wait_for_ready; value->wait_for_ready = wait_for_ready;
return value; return value;
@ -629,7 +652,7 @@ typedef struct client_channel_call_data {
grpc_slice path; // Request path. grpc_slice path; // Request path.
gpr_timespec call_start_time; gpr_timespec call_start_time;
gpr_timespec deadline; gpr_timespec deadline;
wait_for_ready_value wait_for_ready_from_service_config; method_parameters *method_params;
grpc_closure read_service_config; grpc_closure read_service_config;
grpc_error *cancel_error; grpc_error *cancel_error;
@ -836,10 +859,11 @@ static bool pick_subchannel_locked(
initial_metadata_flags & initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
const bool wait_for_ready_set_from_service_config = const bool wait_for_ready_set_from_service_config =
calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET; calld->method_params != NULL &&
calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
if (!wait_for_ready_set_from_api && if (!wait_for_ready_set_from_api &&
wait_for_ready_set_from_service_config) { wait_for_ready_set_from_service_config) {
if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) { if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
} else { } else {
initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
@ -977,10 +1001,9 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
add_waiting_locked(calld, op); add_waiting_locked(calld, op);
} }
static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
void *arg,
grpc_error *error_ignored) { grpc_error *error_ignored) {
GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
grpc_transport_stream_op *op = arg; grpc_transport_stream_op *op = arg;
grpc_call_element *elem = op->handler_private.args[0]; grpc_call_element *elem = op->handler_private.args[0];
@ -990,7 +1013,7 @@ static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
"start_transport_stream_op"); "start_transport_stream_op");
GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); GPR_TIMER_END("start_transport_stream_op_locked", 0);
} }
/* The logic here is fairly complicated, due to (a) the fact that we /* The logic here is fairly complicated, due to (a) the fact that we
@ -1030,53 +1053,54 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_closure_sched( grpc_closure_sched(
exec_ctx, exec_ctx,
grpc_closure_init(&op->handler_private.closure, grpc_closure_init(&op->handler_private.closure,
cc_start_transport_stream_op_locked, op, start_transport_stream_op_locked, op,
grpc_combiner_scheduler(chand->combiner, false)), grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op", 0); GPR_TIMER_END("cc_start_transport_stream_op", 0);
} }
// Sets calld->method_params.
// If the method params specify a timeout, populates
// *per_method_deadline and returns true.
static bool set_call_method_params_from_service_config_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_timespec *per_method_deadline) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (chand->method_params_table != NULL) {
calld->method_params = grpc_method_config_table_get(
exec_ctx, chand->method_params_table, calld->path);
if (calld->method_params != NULL) {
method_parameters_ref(calld->method_params);
if (gpr_time_cmp(calld->method_params->timeout,
gpr_time_0(GPR_TIMESPAN)) != 0) {
*per_method_deadline = gpr_time_add(
calld->call_start_time, calld->method_params->timeout);
return true;
}
}
}
return false;
}
// Gets data from the service config. Invoked when the resolver returns // Gets data from the service config. Invoked when the resolver returns
// its initial result. // its initial result.
static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_call_element *elem = arg; grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
// If this is an error, there's no point in looking at the service config. // If this is an error, there's no point in looking at the service config.
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
// Get the method config table from channel data. gpr_timespec per_method_deadline;
grpc_slice_hash_table *method_params_table = NULL; if (set_call_method_params_from_service_config_locked(
if (chand->method_params_table != NULL) { exec_ctx, elem, &per_method_deadline)) {
method_params_table = // If the deadline from the service config is shorter than the one
grpc_slice_hash_table_ref(chand->method_params_table); // from the client API, reset the deadline timer.
}
// If the method config table was present, use it.
if (method_params_table != NULL) {
const method_parameters *method_params = grpc_method_config_table_get(
exec_ctx, method_params_table, calld->path);
if (method_params != NULL) {
const bool have_method_timeout =
gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
if (have_method_timeout ||
method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
if (have_method_timeout) {
const gpr_timespec per_method_deadline =
gpr_time_add(calld->call_start_time, method_params->timeout);
if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
calld->deadline = per_method_deadline; calld->deadline = per_method_deadline;
// Reset deadline timer.
grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
} }
} }
if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
calld->wait_for_ready_from_service_config =
method_params->wait_for_ready;
}
}
}
grpc_slice_hash_table_unref(exec_ctx, method_params_table);
}
} }
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
} }
@ -1090,30 +1114,13 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
// If the resolver has already returned results, then we can access // If the resolver has already returned results, then we can access
// the service config parameters immediately. Otherwise, we need to // the service config parameters immediately. Otherwise, we need to
// defer that work until the resolver returns an initial result. // defer that work until the resolver returns an initial result.
// TODO(roth): This code is almost but not quite identical to the code
// in read_service_config() above. It would be nice to find a way to
// combine them, to avoid having to maintain it twice.
if (chand->lb_policy != NULL) { if (chand->lb_policy != NULL) {
// We already have a resolver result, so check for service config. // We already have a resolver result, so check for service config.
if (chand->method_params_table != NULL) { gpr_timespec per_method_deadline;
grpc_slice_hash_table *method_params_table = if (set_call_method_params_from_service_config_locked(
grpc_slice_hash_table_ref(chand->method_params_table); exec_ctx, elem, &per_method_deadline)) {
method_parameters *method_params = grpc_method_config_table_get(
exec_ctx, method_params_table, calld->path);
if (method_params != NULL) {
if (gpr_time_cmp(method_params->timeout,
gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) {
gpr_timespec per_method_deadline =
gpr_time_add(calld->call_start_time, method_params->timeout);
calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
} }
if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
calld->wait_for_ready_from_service_config =
method_params->wait_for_ready;
}
}
grpc_slice_hash_table_unref(exec_ctx, method_params_table);
}
} else { } else {
// We don't yet have a resolver result, so register a callback to // We don't yet have a resolver result, so register a callback to
// get the service config data once the resolver returns. // get the service config data once the resolver returns.
@ -1143,7 +1150,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->path = grpc_slice_ref_internal(args->path); calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time; calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; calld->method_params = NULL;
calld->cancel_error = GRPC_ERROR_NONE; calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_atm_rel_store(&calld->subchannel_call, 0);
calld->connected_subchannel = NULL; calld->connected_subchannel = NULL;
@ -1171,6 +1178,9 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem); grpc_deadline_state_destroy(exec_ctx, elem);
grpc_slice_unref_internal(exec_ctx, calld->path); grpc_slice_unref_internal(exec_ctx, calld->path);
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
}
GRPC_ERROR_UNREF(calld->cancel_error); GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld); grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) { if (call != NULL && call != CANCELLED_CALL) {

Loading…
Cancel
Save