Don't use combiner lock for on_complete callback.

pull/9850/head
Mark D. Roth 8 years ago
parent 3a91707ace
commit 9ccbc4d5e5
  1. 20
      src/core/ext/client_channel/client_channel.c
  2. 42
      src/core/ext/client_channel/retry_throttle.c
  3. 10
      src/core/ext/client_channel/retry_throttle.h

@ -423,7 +423,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
GPR_ASSERT(channel_arg != NULL); GPR_ASSERT(channel_arg != NULL);
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); grpc_uri *uri =
grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0'); GPR_ASSERT(uri->path[0] != '\0');
parsing_state.server_name = parsing_state.server_name =
uri->path[0] == '/' ? uri->path + 1 : uri->path; uri->path[0] == '/' ? uri->path + 1 : uri->path;
@ -738,6 +739,7 @@ typedef struct client_channel_call_data {
grpc_slice path; // Request path. grpc_slice path; // Request path.
gpr_timespec call_start_time; gpr_timespec call_start_time;
gpr_timespec deadline; gpr_timespec deadline;
grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params; method_parameters *method_params;
grpc_error *cancel_error; grpc_error *cancel_error;
@ -814,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
gpr_free(ops); gpr_free(ops);
} }
// Sets calld->method_params. // Sets calld->method_params and calld->retry_throttle_data.
// If the method params specify a timeout, populates // If the method params specify a timeout, populates
// *per_method_deadline and returns true. // *per_method_deadline and returns true.
static bool set_call_method_params_from_service_config_locked( static bool set_call_method_params_from_service_config_locked(
@ -822,6 +824,10 @@ static bool set_call_method_params_from_service_config_locked(
gpr_timespec *per_method_deadline) { gpr_timespec *per_method_deadline) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (chand->retry_throttle_data != NULL) {
calld->retry_throttle_data =
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
}
if (chand->method_params_table != NULL) { if (chand->method_params_table != NULL) {
calld->method_params = grpc_method_config_table_get( calld->method_params = grpc_method_config_table_get(
exec_ctx, chand->method_params_table, calld->path); exec_ctx, chand->method_params_table, calld->path);
@ -1135,19 +1141,18 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_call_element *elem = arg; grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (chand->retry_throttle_data != NULL) { if (calld->retry_throttle_data != NULL) {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_server_retry_throttle_data_record_success( grpc_server_retry_throttle_data_record_success(
&chand->retry_throttle_data); calld->retry_throttle_data);
} else { } else {
// TODO(roth): In a subsequent PR, check the return value here and // TODO(roth): In a subsequent PR, check the return value here and
// decide whether or not to retry. Note that we should only // decide whether or not to retry. Note that we should only
// record failures whose statuses match the configured retryable // record failures whose statuses match the configured retryable
// or non-fatal status codes. // or non-fatal status codes.
grpc_server_retry_throttle_data_record_failure( grpc_server_retry_throttle_data_record_failure(
&chand->retry_throttle_data); calld->retry_throttle_data);
} }
} }
grpc_closure_run(exec_ctx, calld->original_on_complete, grpc_closure_run(exec_ctx, calld->original_on_complete,
@ -1160,14 +1165,13 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op *op = arg; grpc_transport_stream_op *op = arg;
grpc_call_element *elem = op->handler_private.args[0]; grpc_call_element *elem = op->handler_private.args[0];
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (op->recv_trailing_metadata != NULL) { if (op->recv_trailing_metadata != NULL) {
GPR_ASSERT(op->on_complete != NULL); GPR_ASSERT(op->on_complete != NULL);
calld->original_on_complete = op->on_complete; calld->original_on_complete = op->on_complete;
grpc_closure_init(&calld->on_complete, on_complete_locked, elem, grpc_closure_init(&calld->on_complete, on_complete_locked, elem,
grpc_combiner_scheduler(chand->combiner, false)); grpc_schedule_on_exec_ctx);
op->on_complete = &calld->on_complete; op->on_complete = &calld->on_complete;
} }

@ -64,26 +64,18 @@ static void get_replacement_throttle_data_if_needed(
(grpc_server_retry_throttle_data*)gpr_atm_acq_load( (grpc_server_retry_throttle_data*)gpr_atm_acq_load(
&(*throttle_data)->replacement); &(*throttle_data)->replacement);
if (new_throttle_data == NULL) return; if (new_throttle_data == NULL) return;
// Reset *throttle_data to its replacement, updating refcounts as
// appropriate.
// Note: It's safe to do this here, because the caller ensures that
// this will only be called with a given value of throttle_data from
// one thread at a time.
grpc_server_retry_throttle_data_ref(new_throttle_data);
grpc_server_retry_throttle_data* old_throttle_data = *throttle_data;
*throttle_data = new_throttle_data; *throttle_data = new_throttle_data;
grpc_server_retry_throttle_data_unref(old_throttle_data);
} }
} }
bool grpc_server_retry_throttle_data_record_failure( bool grpc_server_retry_throttle_data_record_failure(
grpc_server_retry_throttle_data** throttle_data) { grpc_server_retry_throttle_data* throttle_data) {
// First, check if we are stale and need to be replaced. // First, check if we are stale and need to be replaced.
get_replacement_throttle_data_if_needed(throttle_data); get_replacement_throttle_data_if_needed(&throttle_data);
// We decrement milli_tokens by 1000 (1 token) for each failure. // We decrement milli_tokens by 1000 (1 token) for each failure.
const int delta = -1000; const int delta = -1000;
const int old_value = (int)gpr_atm_full_fetch_add( const int old_value = (int)gpr_atm_full_fetch_add(
&(*throttle_data)->milli_tokens, (gpr_atm)delta); &throttle_data->milli_tokens, (gpr_atm)delta);
// If the above change takes us below 0, then re-add the excess. Note // If the above change takes us below 0, then re-add the excess. Note
// that between these two atomic operations, the value will be // that between these two atomic operations, the value will be
// artificially low by as much as 1000, but this window should be // artificially low by as much as 1000, but this window should be
@ -91,41 +83,42 @@ bool grpc_server_retry_throttle_data_record_failure(
int new_value = old_value - 1000; int new_value = old_value - 1000;
if (new_value < 0) { if (new_value < 0) {
const int excess_value = new_value - (old_value < 0 ? old_value : 0); const int excess_value = new_value - (old_value < 0 ? old_value : 0);
gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, gpr_atm_full_fetch_add(&throttle_data->milli_tokens,
(gpr_atm)-excess_value); (gpr_atm)-excess_value);
new_value = 0; new_value = 0;
} }
// Retries are allowed as long as the new value is above the threshold // Retries are allowed as long as the new value is above the threshold
// (max_milli_tokens / 2). // (max_milli_tokens / 2).
return new_value > (*throttle_data)->max_milli_tokens / 2; return new_value > throttle_data->max_milli_tokens / 2;
} }
void grpc_server_retry_throttle_data_record_success( void grpc_server_retry_throttle_data_record_success(
grpc_server_retry_throttle_data** throttle_data) { grpc_server_retry_throttle_data* throttle_data) {
// First, check if we are stale and need to be replaced. // First, check if we are stale and need to be replaced.
get_replacement_throttle_data_if_needed(throttle_data); get_replacement_throttle_data_if_needed(&throttle_data);
// We increment milli_tokens by milli_token_ratio for each success. // We increment milli_tokens by milli_token_ratio for each success.
const int delta = (*throttle_data)->milli_token_ratio; const int delta = throttle_data->milli_token_ratio;
const int old_value = (int)gpr_atm_full_fetch_add( const int old_value = (int)gpr_atm_full_fetch_add(
&(*throttle_data)->milli_tokens, (gpr_atm)delta); &throttle_data->milli_tokens, (gpr_atm)delta);
// If the above change takes us over max_milli_tokens, then subtract // If the above change takes us over max_milli_tokens, then subtract
// the excess. Note that between these two atomic operations, the // the excess. Note that between these two atomic operations, the
// value will be artificially high by as much as milli_token_ratio, // value will be artificially high by as much as milli_token_ratio,
// but this window should be brief. // but this window should be brief.
const int new_value = old_value + (*throttle_data)->milli_token_ratio; const int new_value = old_value + throttle_data->milli_token_ratio;
if (new_value > (*throttle_data)->max_milli_tokens) { if (new_value > throttle_data->max_milli_tokens) {
const int excess_value = const int excess_value =
new_value - (old_value > (*throttle_data)->max_milli_tokens new_value - (old_value > throttle_data->max_milli_tokens
? old_value ? old_value
: (*throttle_data)->max_milli_tokens); : throttle_data->max_milli_tokens);
gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, gpr_atm_full_fetch_add(&throttle_data->milli_tokens,
(gpr_atm)-excess_value); (gpr_atm)-excess_value);
} }
} }
void grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
grpc_server_retry_throttle_data* throttle_data) { grpc_server_retry_throttle_data* throttle_data) {
gpr_ref(&throttle_data->refs); gpr_ref(&throttle_data->refs);
return throttle_data;
} }
void grpc_server_retry_throttle_data_unref( void grpc_server_retry_throttle_data_unref(
@ -189,8 +182,7 @@ static void destroy_server_retry_throttle_data(void* value) {
static void* copy_server_retry_throttle_data(void* value) { static void* copy_server_retry_throttle_data(void* value) {
grpc_server_retry_throttle_data* throttle_data = value; grpc_server_retry_throttle_data* throttle_data = value;
grpc_server_retry_throttle_data_ref(throttle_data); return grpc_server_retry_throttle_data_ref(throttle_data);
return value;
} }
static const gpr_avl_vtable avl_vtable = { static const gpr_avl_vtable avl_vtable = {

@ -40,17 +40,13 @@
typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data;
/// Records a failure. Returns true if it's okay to send a retry. /// Records a failure. Returns true if it's okay to send a retry.
/// Updates \a throttle_data if the original value is stale and has been
/// replaced. Not thread safe; caller must synchronize.
bool grpc_server_retry_throttle_data_record_failure( bool grpc_server_retry_throttle_data_record_failure(
grpc_server_retry_throttle_data** throttle_data); grpc_server_retry_throttle_data* throttle_data);
/// Records a success. /// Records a success.
/// Updates \a throttle_data if the original value is stale and has been
/// replaced. Not thread safe; caller must synchronize.
void grpc_server_retry_throttle_data_record_success( void grpc_server_retry_throttle_data_record_success(
grpc_server_retry_throttle_data** throttle_data); grpc_server_retry_throttle_data* throttle_data);
void grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
grpc_server_retry_throttle_data* throttle_data); grpc_server_retry_throttle_data* throttle_data);
void grpc_server_retry_throttle_data_unref( void grpc_server_retry_throttle_data_unref(
grpc_server_retry_throttle_data* throttle_data); grpc_server_retry_throttle_data* throttle_data);

Loading…
Cancel
Save