diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index ef273669e84..e0b84ddd660 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -423,7 +423,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); GPR_ASSERT(channel_arg != NULL); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + grpc_uri *uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; @@ -738,6 +739,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; + grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; grpc_error *cancel_error; @@ -814,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } -// Sets calld->method_params. +// Sets calld->method_params and calld->retry_throttle_data. // If the method params specify a timeout, populates // *per_method_deadline and returns true. static bool set_call_method_params_from_service_config_locked( @@ -822,6 +824,10 @@ static bool set_call_method_params_from_service_config_locked( gpr_timespec *per_method_deadline) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { calld->method_params = grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); @@ -1135,19 +1141,18 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->retry_throttle_data != NULL) { + if (calld->retry_throttle_data != NULL) { if (error == GRPC_ERROR_NONE) { grpc_server_retry_throttle_data_record_success( - &chand->retry_throttle_data); + calld->retry_throttle_data); } else { // TODO(roth): In a subsequent PR, check the return value here and // decide whether or not to retry. Note that we should only // record failures whose statuses match the configured retryable // or non-fatal status codes. grpc_server_retry_throttle_data_record_failure( - &chand->retry_throttle_data); + calld->retry_throttle_data); } } grpc_closure_run(exec_ctx, calld->original_on_complete, @@ -1160,14 +1165,13 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; if (op->recv_trailing_metadata != NULL) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; grpc_closure_init(&calld->on_complete, on_complete_locked, elem, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c index 2aa52e49033..7b813c33df4 100644 --- a/src/core/ext/client_channel/retry_throttle.c +++ b/src/core/ext/client_channel/retry_throttle.c @@ -64,26 +64,18 @@ static void get_replacement_throttle_data_if_needed( (grpc_server_retry_throttle_data*)gpr_atm_acq_load( &(*throttle_data)->replacement); if (new_throttle_data == NULL) return; - // Reset *throttle_data to its replacement, updating refcounts as - // appropriate. - // Note: It's safe to do this here, because the caller ensures that - // this will only be called with a given value of throttle_data from - // one thread at a time. - grpc_server_retry_throttle_data_ref(new_throttle_data); - grpc_server_retry_throttle_data* old_throttle_data = *throttle_data; *throttle_data = new_throttle_data; - grpc_server_retry_throttle_data_unref(old_throttle_data); } } bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data** throttle_data) { + grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(throttle_data); + get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. const int delta = -1000; const int old_value = (int)gpr_atm_full_fetch_add( - &(*throttle_data)->milli_tokens, (gpr_atm)delta); + &throttle_data->milli_tokens, (gpr_atm)delta); // If the above change takes us below 0, then re-add the excess. Note // that between these two atomic operations, the value will be // artificially low by as much as 1000, but this window should be @@ -91,41 +83,42 @@ bool grpc_server_retry_throttle_data_record_failure( int new_value = old_value - 1000; if (new_value < 0) { const int excess_value = new_value - (old_value < 0 ? old_value : 0); - gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + gpr_atm_full_fetch_add(&throttle_data->milli_tokens, (gpr_atm)-excess_value); new_value = 0; } // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). - return new_value > (*throttle_data)->max_milli_tokens / 2; + return new_value > throttle_data->max_milli_tokens / 2; } void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data** throttle_data) { + grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(throttle_data); + get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. - const int delta = (*throttle_data)->milli_token_ratio; + const int delta = throttle_data->milli_token_ratio; const int old_value = (int)gpr_atm_full_fetch_add( - &(*throttle_data)->milli_tokens, (gpr_atm)delta); + &throttle_data->milli_tokens, (gpr_atm)delta); // If the above change takes us over max_milli_tokens, then subtract // the excess. Note that between these two atomic operations, the // value will be artificially high by as much as milli_token_ratio, // but this window should be brief. - const int new_value = old_value + (*throttle_data)->milli_token_ratio; - if (new_value > (*throttle_data)->max_milli_tokens) { + const int new_value = old_value + throttle_data->milli_token_ratio; + if (new_value > throttle_data->max_milli_tokens) { const int excess_value = - new_value - (old_value > (*throttle_data)->max_milli_tokens + new_value - (old_value > throttle_data->max_milli_tokens ? old_value - : (*throttle_data)->max_milli_tokens); - gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + : throttle_data->max_milli_tokens); + gpr_atm_full_fetch_add(&throttle_data->milli_tokens, (gpr_atm)-excess_value); } } -void grpc_server_retry_throttle_data_ref( +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data) { gpr_ref(&throttle_data->refs); + return throttle_data; } void grpc_server_retry_throttle_data_unref( @@ -189,8 +182,7 @@ static void destroy_server_retry_throttle_data(void* value) { static void* copy_server_retry_throttle_data(void* value) { grpc_server_retry_throttle_data* throttle_data = value; - grpc_server_retry_throttle_data_ref(throttle_data); - return value; + return grpc_server_retry_throttle_data_ref(throttle_data); } static const gpr_avl_vtable avl_vtable = { diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h index 4209bb7fb66..f9971faf651 100644 --- a/src/core/ext/client_channel/retry_throttle.h +++ b/src/core/ext/client_channel/retry_throttle.h @@ -40,17 +40,13 @@ typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; /// Records a failure. Returns true if it's okay to send a retry. -/// Updates \a throttle_data if the original value is stale and has been -/// replaced. Not thread safe; caller must synchronize. bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data** throttle_data); + grpc_server_retry_throttle_data* throttle_data); /// Records a success. -/// Updates \a throttle_data if the original value is stale and has been -/// replaced. Not thread safe; caller must synchronize. void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data** throttle_data); + grpc_server_retry_throttle_data* throttle_data); -void grpc_server_retry_throttle_data_ref( +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data); void grpc_server_retry_throttle_data_unref( grpc_server_retry_throttle_data* throttle_data);