From d6d192d00591ba278115e73d264f279fe248d544 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 23 Feb 2017 08:58:42 -0800 Subject: [PATCH 1/6] Retry throttling implementation. --- BUILD | 2 + CMakeLists.txt | 4 + Makefile | 4 + binding.gyp | 1 + build.yaml | 2 + config.m4 | 1 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + package.xml | 2 + src/core/ext/client_channel/client_channel.c | 125 ++++++++- .../client_channel/client_channel_plugin.c | 3 + src/core/ext/client_channel/retry_throttle.c | 242 ++++++++++++++++++ src/core/ext/client_channel/retry_throttle.h | 69 +++++ src/core/lib/transport/service_config.c | 12 + src/core/lib/transport/service_config.h | 6 + src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 2 + .../generated/sources_and_headers.json | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 + .../grpc_unsecure/grpc_unsecure.vcxproj | 3 + .../grpc_unsecure.vcxproj.filters | 6 + 22 files changed, 496 insertions(+), 6 deletions(-) create mode 100644 src/core/ext/client_channel/retry_throttle.c create mode 100644 src/core/ext/client_channel/retry_throttle.h diff --git a/BUILD b/BUILD index 9fee9085723..2df30b81802 100644 --- a/BUILD +++ b/BUILD @@ -692,6 +692,7 @@ grpc_cc_library( "src/core/ext/client_channel/resolver.c", "src/core/ext/client_channel/resolver_factory.c", "src/core/ext/client_channel/resolver_registry.c", + "src/core/ext/client_channel/retry_throttle.c", "src/core/ext/client_channel/subchannel.c", "src/core/ext/client_channel/subchannel_index.c", "src/core/ext/client_channel/uri_parser.c", @@ -712,6 +713,7 @@ grpc_cc_library( "src/core/ext/client_channel/resolver.h", "src/core/ext/client_channel/resolver_factory.h", "src/core/ext/client_channel/resolver_registry.h", + "src/core/ext/client_channel/retry_throttle.h", "src/core/ext/client_channel/subchannel.h", "src/core/ext/client_channel/subchannel_index.h", "src/core/ext/client_channel/uri_parser.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index ca0a668f580..ebe28f55bb4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1017,6 +1017,7 @@ add_library(grpc src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -1299,6 +1300,7 @@ add_library(grpc_cronet src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -1849,6 +1851,7 @@ add_library(grpc_unsecure src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -2427,6 +2430,7 @@ add_library(grpc++_cronet src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c diff --git a/Makefile b/Makefile index 93486bd2e4c..4a4c8733844 100644 --- a/Makefile +++ b/Makefile @@ -2865,6 +2865,7 @@ LIBGRPC_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ @@ -3150,6 +3151,7 @@ LIBGRPC_CRONET_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ @@ -3683,6 +3685,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ @@ -4263,6 +4266,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ diff --git a/binding.gyp b/binding.gyp index 8ff3d8c1a34..5aa6ee8d234 100644 --- a/binding.gyp +++ b/binding.gyp @@ -790,6 +790,7 @@ 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', diff --git a/build.yaml b/build.yaml index 55f011129a5..f72a992a602 100644 --- a/build.yaml +++ b/build.yaml @@ -416,6 +416,7 @@ filegroups: - src/core/ext/client_channel/resolver.h - src/core/ext/client_channel/resolver_factory.h - src/core/ext/client_channel/resolver_registry.h + - src/core/ext/client_channel/retry_throttle.h - src/core/ext/client_channel/subchannel.h - src/core/ext/client_channel/subchannel_index.h - src/core/ext/client_channel/uri_parser.h @@ -438,6 +439,7 @@ filegroups: - src/core/ext/client_channel/resolver.c - src/core/ext/client_channel/resolver_factory.c - src/core/ext/client_channel/resolver_registry.c + - src/core/ext/client_channel/retry_throttle.c - src/core/ext/client_channel/subchannel.c - src/core/ext/client_channel/subchannel_index.c - src/core/ext/client_channel/uri_parser.c diff --git a/config.m4 b/config.m4 index 90536e503ed..5eaf161f096 100644 --- a/config.m4 +++ b/config.m4 @@ -269,6 +269,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 759310346fb..1fb644ba9d5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -417,6 +417,7 @@ Pod::Spec.new do |s| 'src/core/ext/client_channel/resolver.h', 'src/core/ext/client_channel/resolver_factory.h', 'src/core/ext/client_channel/resolver_registry.h', + 'src/core/ext/client_channel/retry_throttle.h', 'src/core/ext/client_channel/subchannel.h', 'src/core/ext/client_channel/subchannel_index.h', 'src/core/ext/client_channel/uri_parser.h', @@ -636,6 +637,7 @@ Pod::Spec.new do |s| 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', @@ -851,6 +853,7 @@ Pod::Spec.new do |s| 'src/core/ext/client_channel/resolver.h', 'src/core/ext/client_channel/resolver_factory.h', 'src/core/ext/client_channel/resolver_registry.h', + 'src/core/ext/client_channel/retry_throttle.h', 'src/core/ext/client_channel/subchannel.h', 'src/core/ext/client_channel/subchannel_index.h', 'src/core/ext/client_channel/uri_parser.h', diff --git a/grpc.gemspec b/grpc.gemspec index 82c9d680801..fae773d7468 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -334,6 +334,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/client_channel/resolver.h ) s.files += %w( src/core/ext/client_channel/resolver_factory.h ) s.files += %w( src/core/ext/client_channel/resolver_registry.h ) + s.files += %w( src/core/ext/client_channel/retry_throttle.h ) s.files += %w( src/core/ext/client_channel/subchannel.h ) s.files += %w( src/core/ext/client_channel/subchannel_index.h ) s.files += %w( src/core/ext/client_channel/uri_parser.h ) @@ -553,6 +554,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/client_channel/resolver.c ) s.files += %w( src/core/ext/client_channel/resolver_factory.c ) s.files += %w( src/core/ext/client_channel/resolver_registry.c ) + s.files += %w( src/core/ext/client_channel/retry_throttle.c ) s.files += %w( src/core/ext/client_channel/subchannel.c ) s.files += %w( src/core/ext/client_channel/subchannel_index.c ) s.files += %w( src/core/ext/client_channel/uri_parser.c ) diff --git a/package.xml b/package.xml index e4db6a7d2e5..d4a05f7e872 100644 --- a/package.xml +++ b/package.xml @@ -343,6 +343,7 @@ + @@ -562,6 +563,7 @@ + diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 6cbc333b832..1cc2b9455fe 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -47,6 +47,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -165,6 +166,8 @@ typedef struct client_channel_channel_data { grpc_combiner *combiner; /** currently active load balancer */ grpc_lb_policy *lb_policy; + /** retry throttle data */ + grpc_server_retry_throttle_data *retry_throttle_data; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -260,6 +263,64 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +typedef struct { + char *server_name; + grpc_server_retry_throttle_data *retry_throttle_data; +} service_config_parsing_state; + +static void parse_retry_throttle_params(const grpc_json *field, void *arg) { + service_config_parsing_state *parsing_state = arg; + if (strcmp(field->key, "retryThrottling") == 0) { + if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json *sub_field = field->child; sub_field != NULL; + sub_field = sub_field->next) { + if (sub_field->key == NULL) continue; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char *decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != NULL) { + whole_len = (size_t)(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + } + } + parsing_state->retry_throttle_data = + grpc_retry_throttle_map_get_data_for_server( + parsing_state->server_name, max_milli_tokens, milli_token_ratio); + } +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -271,6 +332,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); char *service_config_json = NULL; + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); if (chand->resolver_result != NULL) { // Find LB policy name. @@ -330,6 +393,18 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_service_config *service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + parsing_state.server_name = NULL; + grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); @@ -361,6 +436,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); + + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } + chand->retry_throttle_data = parsing_state.retry_throttle_data; if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -589,6 +669,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -651,6 +734,9 @@ typedef struct client_channel_call_data { grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; + + grpc_closure on_complete; + grpc_closure *original_on_complete; } call_data; grpc_subchannel_call *grpc_client_channel_get_subchannel_call( @@ -977,20 +1063,47 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); +static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + if (error == GRPC_ERROR_NONE) { + grpc_server_retry_throttle_data_record_success( + &chand->retry_throttle_data); + } else { + // TODO(roth): In a subsequent PR, check the return value here and + // decide whether or not to retry. + grpc_server_retry_throttle_data_record_failure( + &chand->retry_throttle_data); + } + } + grpc_closure_run(exec_ctx, calld->original_on_complete, error); +} + +static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; + channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)); + op->on_complete = &calld->on_complete; + } + start_transport_stream_op_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "start_transport_stream_op"); - GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); + GPR_TIMER_END("start_transport_stream_op_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1030,7 +1143,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - cc_start_transport_stream_op_locked, op, + start_transport_stream_op_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 6f9df3e386e..c8d2105b47f 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" @@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); grpc_resolver_registry_init(); + grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); @@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); + grpc_retry_throttle_map_shutdown(); grpc_resolver_registry_shutdown(); grpc_lb_policy_registry_shutdown(); } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c new file mode 100644 index 00000000000..2aa52e49033 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.c @@ -0,0 +1,242 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_channel/retry_throttle.h" + +#include +#include + +#include +#include +#include +#include +#include + +// +// server_retry_throttle_data +// + +struct grpc_server_retry_throttle_data { + gpr_refcount refs; + int max_milli_tokens; + int milli_token_ratio; + gpr_atm milli_tokens; + // A pointer to the replacement for this grpc_server_retry_throttle_data + // entry. If non-NULL, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement; +}; + +static void get_replacement_throttle_data_if_needed( + grpc_server_retry_throttle_data** throttle_data) { + while (true) { + grpc_server_retry_throttle_data* new_throttle_data = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &(*throttle_data)->replacement); + if (new_throttle_data == NULL) return; + // Reset *throttle_data to its replacement, updating refcounts as + // appropriate. + // Note: It's safe to do this here, because the caller ensures that + // this will only be called with a given value of throttle_data from + // one thread at a time. + grpc_server_retry_throttle_data_ref(new_throttle_data); + grpc_server_retry_throttle_data* old_throttle_data = *throttle_data; + *throttle_data = new_throttle_data; + grpc_server_retry_throttle_data_unref(old_throttle_data); + } +} + +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data** throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(throttle_data); + // We decrement milli_tokens by 1000 (1 token) for each failure. + const int delta = -1000; + const int old_value = (int)gpr_atm_full_fetch_add( + &(*throttle_data)->milli_tokens, (gpr_atm)delta); + // If the above change takes us below 0, then re-add the excess. Note + // that between these two atomic operations, the value will be + // artificially low by as much as 1000, but this window should be + // brief. + int new_value = old_value - 1000; + if (new_value < 0) { + const int excess_value = new_value - (old_value < 0 ? old_value : 0); + gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + (gpr_atm)-excess_value); + new_value = 0; + } + // Retries are allowed as long as the new value is above the threshold + // (max_milli_tokens / 2). + return new_value > (*throttle_data)->max_milli_tokens / 2; +} + +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data** throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(throttle_data); + // We increment milli_tokens by milli_token_ratio for each success. + const int delta = (*throttle_data)->milli_token_ratio; + const int old_value = (int)gpr_atm_full_fetch_add( + &(*throttle_data)->milli_tokens, (gpr_atm)delta); + // If the above change takes us over max_milli_tokens, then subtract + // the excess. Note that between these two atomic operations, the + // value will be artificially high by as much as milli_token_ratio, + // but this window should be brief. + const int new_value = old_value + (*throttle_data)->milli_token_ratio; + if (new_value > (*throttle_data)->max_milli_tokens) { + const int excess_value = + new_value - (old_value > (*throttle_data)->max_milli_tokens + ? old_value + : (*throttle_data)->max_milli_tokens); + gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + (gpr_atm)-excess_value); + } +} + +void grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data) { + gpr_ref(&throttle_data->refs); +} + +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data) { + if (gpr_unref(&throttle_data->refs)) { + grpc_server_retry_throttle_data* replacement = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &throttle_data->replacement); + if (replacement != NULL) { + grpc_server_retry_throttle_data_unref(replacement); + } + gpr_free(throttle_data); + } +} + +static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( + int max_milli_tokens, int milli_token_ratio, + grpc_server_retry_throttle_data* old_throttle_data) { + grpc_server_retry_throttle_data* throttle_data = + gpr_malloc(sizeof(*throttle_data)); + memset(throttle_data, 0, sizeof(*throttle_data)); + gpr_ref_init(&throttle_data->refs, 1); + throttle_data->max_milli_tokens = max_milli_tokens; + throttle_data->milli_token_ratio = milli_token_ratio; + int initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != NULL) { + double token_fraction = + (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / + (double)old_throttle_data->max_milli_tokens; + initial_milli_tokens = (int)(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&throttle_data->milli_tokens, + (gpr_atm)initial_milli_tokens); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != NULL) { + grpc_server_retry_throttle_data_ref(throttle_data); + gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); + } + return throttle_data; +} + +// +// avl vtable for string -> server_retry_throttle_data map +// + +static void* copy_server_name(void* key) { return gpr_strdup(key); } + +static long compare_server_name(void* key1, void* key2) { + return strcmp(key1, key2); +} + +static void destroy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_unref(throttle_data); +} + +static void* copy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_ref(throttle_data); + return value; +} + +static const gpr_avl_vtable avl_vtable = { + gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; + +// +// server_retry_throttle_map +// + +static gpr_mu g_mu; +static gpr_avl g_avl; + +void grpc_retry_throttle_map_init() { + gpr_mu_init(&g_mu); + g_avl = gpr_avl_create(&avl_vtable); +} + +void grpc_retry_throttle_map_shutdown() { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_avl); +} + +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio) { + gpr_mu_lock(&g_mu); + grpc_server_retry_throttle_data* throttle_data = + gpr_avl_get(g_avl, (char*)server_name); + if (throttle_data == NULL) { + // Entry not found. Create a new one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, NULL); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + if (throttle_data->max_milli_tokens != max_milli_tokens || + throttle_data->milli_token_ratio != milli_token_ratio) { + // Entry found but with old parameters. Create a new one based on + // the original one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + // Entry found. Increase refcount. + grpc_server_retry_throttle_data_ref(throttle_data); + } + } + gpr_mu_unlock(&g_mu); + return throttle_data; +} diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h new file mode 100644 index 00000000000..4209bb7fb66 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.h @@ -0,0 +1,69 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H + +#include + +/// Tracks retry throttling data for an individual server name. +typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; + +/// Records a failure. Returns true if it's okay to send a retry. +/// Updates \a throttle_data if the original value is stale and has been +/// replaced. Not thread safe; caller must synchronize. +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data** throttle_data); +/// Records a success. +/// Updates \a throttle_data if the original value is stale and has been +/// replaced. Not thread safe; caller must synchronize. +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data** throttle_data); + +void grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data); +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data); + +/// Initializes global map of failure data for each server name. +void grpc_retry_throttle_map_init(); +/// Shuts down global map of failure data for each server name. +void grpc_retry_throttle_map_shutdown(); + +/// Returns a reference to the failure data for \a server_name, creating +/// a new entry if needed. +/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */ diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 12da2a88feb..1195f75044a 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) { gpr_free(service_config); } +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return; + if (strcmp(field->key, "methodConfig") == 0) continue; + process_json(field, arg); + } +} + const char* grpc_service_config_get_lb_policy_name( const grpc_service_config* service_config) { const grpc_json* json = service_config->json_tree; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index cd739a593c2..ebfc59b5347 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config; grpc_service_config* grpc_service_config_create(const char* json_string); void grpc_service_config_destroy(grpc_service_config* service_config); +/// Invokes \a process_json() for each global parameter in the service +/// config. \a arg is passed as the second argument to \a process_json(). +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg); + /// Gets the LB policy name from \a service_config. /// Returns NULL if no LB policy name was specified. /// Caller does NOT take ownership. diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a9f20e6d2a8..94d6e46cae8 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -263,6 +263,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 10801254ef9..1237bdfe3bc 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -925,6 +925,8 @@ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_factory.h \ src/core/ext/client_channel/resolver_registry.c \ src/core/ext/client_channel/resolver_registry.h \ +src/core/ext/client_channel/retry_throttle.c \ +src/core/ext/client_channel/retry_throttle.h \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel.h \ src/core/ext/client_channel/subchannel_index.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 462353cb50a..03dbb6cc6f4 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7467,6 +7467,7 @@ "src/core/ext/client_channel/resolver.h", "src/core/ext/client_channel/resolver_factory.h", "src/core/ext/client_channel/resolver_registry.h", + "src/core/ext/client_channel/retry_throttle.h", "src/core/ext/client_channel/subchannel.h", "src/core/ext/client_channel/subchannel_index.h", "src/core/ext/client_channel/uri_parser.h" @@ -7508,6 +7509,8 @@ "src/core/ext/client_channel/resolver_factory.h", "src/core/ext/client_channel/resolver_registry.c", "src/core/ext/client_channel/resolver_registry.h", + "src/core/ext/client_channel/retry_throttle.c", + "src/core/ext/client_channel/retry_throttle.h", "src/core/ext/client_channel/subchannel.c", "src/core/ext/client_channel/subchannel.h", "src/core/ext/client_channel/subchannel_index.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index fde60be3e20..695524913db 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -466,6 +466,7 @@ + @@ -876,6 +877,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 8edbbc22bed..2fc34a8525f 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -568,6 +568,9 @@ src\core\ext\client_channel + + src\core\ext\client_channel + src\core\ext\client_channel @@ -1271,6 +1274,9 @@ src\core\ext\client_channel + + src\core\ext\client_channel + src\core\ext\client_channel diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 22f4740b8fa..d15c6924e02 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -432,6 +432,7 @@ + @@ -793,6 +794,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 5021cb47d8d..8e4835ee148 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -496,6 +496,9 @@ src\core\ext\client_channel + + src\core\ext\client_channel + src\core\ext\client_channel @@ -1109,6 +1112,9 @@ src\core\ext\client_channel + + src\core\ext\client_channel + src\core\ext\client_channel From b33225678a656c50df8f6437123355bd178e0e30 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 23 Feb 2017 14:38:02 -0800 Subject: [PATCH 2/6] Code review changes. --- src/core/ext/client_channel/client_channel.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 1cc2b9455fe..967709bf741 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -277,7 +277,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { int milli_token_ratio = 0; for (grpc_json *sub_field = field->child; sub_field != NULL; sub_field = sub_field->next) { - if (sub_field->key == NULL) continue; + if (sub_field->key == NULL) return; if (strcmp(sub_field->key, "maxTokens") == 0) { if (max_milli_tokens != 0) return; // Duplicate. if (sub_field->type != GRPC_JSON_NUMBER) return; @@ -313,6 +313,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { return; } milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; } } parsing_state->retry_throttle_data = @@ -1074,7 +1075,9 @@ static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, &chand->retry_throttle_data); } else { // TODO(roth): In a subsequent PR, check the return value here and - // decide whether or not to retry. + // decide whether or not to retry. Note that we should only + // record failures whose statuses match the configured retryable + // or non-fatal status codes. grpc_server_retry_throttle_data_record_failure( &chand->retry_throttle_data); } From 95039b57dc812dff2c0edfd80f6c09179afabc97 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 24 Feb 2017 07:59:45 -0800 Subject: [PATCH 3/6] Fix refcounting bug. --- src/core/ext/client_channel/client_channel.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 967709bf741..c2982004653 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -1082,7 +1082,8 @@ static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, &chand->retry_throttle_data); } } - grpc_closure_run(exec_ctx, calld->original_on_complete, error); + grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_ERROR_REF(error)); } static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, From 9ccbc4d5e51bcaa0adc3ca6837d93b7c162b3ca6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 15 Mar 2017 08:30:04 -0700 Subject: [PATCH 4/6] Don't use combiner lock for on_complete callback. --- src/core/ext/client_channel/client_channel.c | 20 ++++++---- src/core/ext/client_channel/retry_throttle.c | 42 ++++++++------------ src/core/ext/client_channel/retry_throttle.h | 10 ++--- 3 files changed, 32 insertions(+), 40 deletions(-) diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index ef273669e84..e0b84ddd660 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -423,7 +423,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); GPR_ASSERT(channel_arg != NULL); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + grpc_uri *uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; @@ -738,6 +739,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; + grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; grpc_error *cancel_error; @@ -814,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } -// Sets calld->method_params. +// Sets calld->method_params and calld->retry_throttle_data. // If the method params specify a timeout, populates // *per_method_deadline and returns true. static bool set_call_method_params_from_service_config_locked( @@ -822,6 +824,10 @@ static bool set_call_method_params_from_service_config_locked( gpr_timespec *per_method_deadline) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { calld->method_params = grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); @@ -1135,19 +1141,18 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->retry_throttle_data != NULL) { + if (calld->retry_throttle_data != NULL) { if (error == GRPC_ERROR_NONE) { grpc_server_retry_throttle_data_record_success( - &chand->retry_throttle_data); + calld->retry_throttle_data); } else { // TODO(roth): In a subsequent PR, check the return value here and // decide whether or not to retry. Note that we should only // record failures whose statuses match the configured retryable // or non-fatal status codes. grpc_server_retry_throttle_data_record_failure( - &chand->retry_throttle_data); + calld->retry_throttle_data); } } grpc_closure_run(exec_ctx, calld->original_on_complete, @@ -1160,14 +1165,13 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; if (op->recv_trailing_metadata != NULL) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; grpc_closure_init(&calld->on_complete, on_complete_locked, elem, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c index 2aa52e49033..7b813c33df4 100644 --- a/src/core/ext/client_channel/retry_throttle.c +++ b/src/core/ext/client_channel/retry_throttle.c @@ -64,26 +64,18 @@ static void get_replacement_throttle_data_if_needed( (grpc_server_retry_throttle_data*)gpr_atm_acq_load( &(*throttle_data)->replacement); if (new_throttle_data == NULL) return; - // Reset *throttle_data to its replacement, updating refcounts as - // appropriate. - // Note: It's safe to do this here, because the caller ensures that - // this will only be called with a given value of throttle_data from - // one thread at a time. - grpc_server_retry_throttle_data_ref(new_throttle_data); - grpc_server_retry_throttle_data* old_throttle_data = *throttle_data; *throttle_data = new_throttle_data; - grpc_server_retry_throttle_data_unref(old_throttle_data); } } bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data** throttle_data) { + grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(throttle_data); + get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. const int delta = -1000; const int old_value = (int)gpr_atm_full_fetch_add( - &(*throttle_data)->milli_tokens, (gpr_atm)delta); + &throttle_data->milli_tokens, (gpr_atm)delta); // If the above change takes us below 0, then re-add the excess. Note // that between these two atomic operations, the value will be // artificially low by as much as 1000, but this window should be @@ -91,41 +83,42 @@ bool grpc_server_retry_throttle_data_record_failure( int new_value = old_value - 1000; if (new_value < 0) { const int excess_value = new_value - (old_value < 0 ? old_value : 0); - gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + gpr_atm_full_fetch_add(&throttle_data->milli_tokens, (gpr_atm)-excess_value); new_value = 0; } // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). - return new_value > (*throttle_data)->max_milli_tokens / 2; + return new_value > throttle_data->max_milli_tokens / 2; } void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data** throttle_data) { + grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(throttle_data); + get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. - const int delta = (*throttle_data)->milli_token_ratio; + const int delta = throttle_data->milli_token_ratio; const int old_value = (int)gpr_atm_full_fetch_add( - &(*throttle_data)->milli_tokens, (gpr_atm)delta); + &throttle_data->milli_tokens, (gpr_atm)delta); // If the above change takes us over max_milli_tokens, then subtract // the excess. Note that between these two atomic operations, the // value will be artificially high by as much as milli_token_ratio, // but this window should be brief. - const int new_value = old_value + (*throttle_data)->milli_token_ratio; - if (new_value > (*throttle_data)->max_milli_tokens) { + const int new_value = old_value + throttle_data->milli_token_ratio; + if (new_value > throttle_data->max_milli_tokens) { const int excess_value = - new_value - (old_value > (*throttle_data)->max_milli_tokens + new_value - (old_value > throttle_data->max_milli_tokens ? old_value - : (*throttle_data)->max_milli_tokens); - gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + : throttle_data->max_milli_tokens); + gpr_atm_full_fetch_add(&throttle_data->milli_tokens, (gpr_atm)-excess_value); } } -void grpc_server_retry_throttle_data_ref( +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data) { gpr_ref(&throttle_data->refs); + return throttle_data; } void grpc_server_retry_throttle_data_unref( @@ -189,8 +182,7 @@ static void destroy_server_retry_throttle_data(void* value) { static void* copy_server_retry_throttle_data(void* value) { grpc_server_retry_throttle_data* throttle_data = value; - grpc_server_retry_throttle_data_ref(throttle_data); - return value; + return grpc_server_retry_throttle_data_ref(throttle_data); } static const gpr_avl_vtable avl_vtable = { diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h index 4209bb7fb66..f9971faf651 100644 --- a/src/core/ext/client_channel/retry_throttle.h +++ b/src/core/ext/client_channel/retry_throttle.h @@ -40,17 +40,13 @@ typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; /// Records a failure. Returns true if it's okay to send a retry. -/// Updates \a throttle_data if the original value is stale and has been -/// replaced. Not thread safe; caller must synchronize. bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data** throttle_data); + grpc_server_retry_throttle_data* throttle_data); /// Records a success. -/// Updates \a throttle_data if the original value is stale and has been -/// replaced. Not thread safe; caller must synchronize. void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data** throttle_data); + grpc_server_retry_throttle_data* throttle_data); -void grpc_server_retry_throttle_data_ref( +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data); void grpc_server_retry_throttle_data_unref( grpc_server_retry_throttle_data* throttle_data); From de14410b43cfbed4c1e0b777aa18a764d92d56ad Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 15 Mar 2017 10:11:03 -0700 Subject: [PATCH 5/6] Rename on_complete_locked() to on_complete(). --- src/core/ext/client_channel/client_channel.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index e0b84ddd660..937ebfcb55d 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -1138,8 +1138,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; if (calld->retry_throttle_data != NULL) { @@ -1170,7 +1169,7 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (op->recv_trailing_metadata != NULL) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; - grpc_closure_init(&calld->on_complete, on_complete_locked, elem, + grpc_closure_init(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } From fecba535d99ec2c819a0d26707047bf2f2f323fa Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 17 Mar 2017 09:50:48 -0700 Subject: [PATCH 6/6] Switch to using a CAS loop to update the token value. --- BUILD | 1 + CMakeLists.txt | 1 + Makefile | 1 + binding.gyp | 1 + build.yaml | 1 + config.m4 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + include/grpc/impl/codegen/atm.h | 5 ++ package.xml | 1 + src/core/ext/client_channel/retry_throttle.c | 36 +++----------- src/core/lib/support/atm.c | 47 +++++++++++++++++++ src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 1 + .../generated/sources_and_headers.json | 1 + vsprojects/vcxproj/gpr/gpr.vcxproj | 2 + vsprojects/vcxproj/gpr/gpr.vcxproj.filters | 3 ++ 17 files changed, 75 insertions(+), 30 deletions(-) create mode 100644 src/core/lib/support/atm.c diff --git a/BUILD b/BUILD index 4e1f20c3b21..1fe72c02db1 100644 --- a/BUILD +++ b/BUILD @@ -309,6 +309,7 @@ grpc_cc_library( "src/core/lib/profiling/basic_timers.c", "src/core/lib/profiling/stap_timers.c", "src/core/lib/support/alloc.c", + "src/core/lib/support/atm.c", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/cmdline.c", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8bc07255f15..9e99062f581 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -693,6 +693,7 @@ add_library(gpr src/core/lib/profiling/basic_timers.c src/core/lib/profiling/stap_timers.c src/core/lib/support/alloc.c + src/core/lib/support/atm.c src/core/lib/support/avl.c src/core/lib/support/backoff.c src/core/lib/support/cmdline.c diff --git a/Makefile b/Makefile index 11bac54c796..2f7120987ab 100644 --- a/Makefile +++ b/Makefile @@ -2599,6 +2599,7 @@ LIBGPR_SRC = \ src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/binding.gyp b/binding.gyp index f6a04b27f9f..1107f318891 100644 --- a/binding.gyp +++ b/binding.gyp @@ -544,6 +544,7 @@ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/build.yaml b/build.yaml index ae546cbb308..7b27968c61d 100644 --- a/build.yaml +++ b/build.yaml @@ -101,6 +101,7 @@ filegroups: - src/core/lib/profiling/basic_timers.c - src/core/lib/profiling/stap_timers.c - src/core/lib/support/alloc.c + - src/core/lib/support/atm.c - src/core/lib/support/avl.c - src/core/lib/support/backoff.c - src/core/lib/support/cmdline.c diff --git a/config.m4 b/config.m4 index 5eaf161f096..010401f2fb6 100644 --- a/config.m4 +++ b/config.m4 @@ -39,6 +39,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 2fb00a3afe0..c78e4a70233 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -211,6 +211,7 @@ Pod::Spec.new do |s| 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/grpc.gemspec b/grpc.gemspec index 1ca2446e65c..95aba00fd7c 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -97,6 +97,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/profiling/basic_timers.c ) s.files += %w( src/core/lib/profiling/stap_timers.c ) s.files += %w( src/core/lib/support/alloc.c ) + s.files += %w( src/core/lib/support/atm.c ) s.files += %w( src/core/lib/support/avl.c ) s.files += %w( src/core/lib/support/backoff.c ) s.files += %w( src/core/lib/support/cmdline.c ) diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h index ae00fb0f169..4bd572d6d18 100644 --- a/include/grpc/impl/codegen/atm.h +++ b/include/grpc/impl/codegen/atm.h @@ -92,4 +92,9 @@ #error could not determine platform for atm #endif +/** Adds \a delta to \a *value, clamping the result to the range specified + by \a min and \a max. Returns the new value. */ +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max); + #endif /* GRPC_IMPL_CODEGEN_ATM_H */ diff --git a/package.xml b/package.xml index e29f462d333..83ad2d21298 100644 --- a/package.xml +++ b/package.xml @@ -106,6 +106,7 @@ + diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c index 7b813c33df4..8926c3d7822 100644 --- a/src/core/ext/client_channel/retry_throttle.c +++ b/src/core/ext/client_channel/retry_throttle.c @@ -73,20 +73,9 @@ bool grpc_server_retry_throttle_data_record_failure( // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. - const int delta = -1000; - const int old_value = (int)gpr_atm_full_fetch_add( - &throttle_data->milli_tokens, (gpr_atm)delta); - // If the above change takes us below 0, then re-add the excess. Note - // that between these two atomic operations, the value will be - // artificially low by as much as 1000, but this window should be - // brief. - int new_value = old_value - 1000; - if (new_value < 0) { - const int excess_value = new_value - (old_value < 0 ? old_value : 0); - gpr_atm_full_fetch_add(&throttle_data->milli_tokens, - (gpr_atm)-excess_value); - new_value = 0; - } + const int new_value = (int)gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, + (gpr_atm)throttle_data->max_milli_tokens); // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). return new_value > throttle_data->max_milli_tokens / 2; @@ -97,22 +86,9 @@ void grpc_server_retry_throttle_data_record_success( // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. - const int delta = throttle_data->milli_token_ratio; - const int old_value = (int)gpr_atm_full_fetch_add( - &throttle_data->milli_tokens, (gpr_atm)delta); - // If the above change takes us over max_milli_tokens, then subtract - // the excess. Note that between these two atomic operations, the - // value will be artificially high by as much as milli_token_ratio, - // but this window should be brief. - const int new_value = old_value + throttle_data->milli_token_ratio; - if (new_value > throttle_data->max_milli_tokens) { - const int excess_value = - new_value - (old_value > throttle_data->max_milli_tokens - ? old_value - : throttle_data->max_milli_tokens); - gpr_atm_full_fetch_add(&throttle_data->milli_tokens, - (gpr_atm)-excess_value); - } + gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, + (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); } grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c new file mode 100644 index 00000000000..06e8432caf8 --- /dev/null +++ b/src/core/lib/support/atm.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include + +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max) { + gpr_atm current; + gpr_atm new; + do { + current = gpr_atm_no_barrier_load(value); + new = GPR_CLAMP(current + delta, min, max); + if (new == current) break; + } while (!gpr_atm_no_barrier_cas(value, current, new)); + return new; +} diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 94d6e46cae8..da0dba7dfee 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -33,6 +33,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fbe1f7f78e1..7147d152ef2 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1230,6 +1230,7 @@ src/core/lib/slice/slice_internal.h \ src/core/lib/slice/slice_string_helpers.c \ src/core/lib/slice/slice_string_helpers.h \ src/core/lib/support/alloc.c \ +src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/backoff.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index b2f9078c054..7a6295cf72f 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7311,6 +7311,7 @@ "src/core/lib/profiling/stap_timers.c", "src/core/lib/profiling/timers.h", "src/core/lib/support/alloc.c", + "src/core/lib/support/atm.c", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/backoff.h", diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj index 44c21ddeb31..67ac3b98c5e 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj @@ -208,6 +208,8 @@ + + diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters index a5924a624a8..c49c87ed603 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters @@ -10,6 +10,9 @@ src\core\lib\support + + src\core\lib\support + src\core\lib\support