From dde6afc0c01d2ebac5c5532aaf35e416c72a4245 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 22 Nov 2017 16:31:01 -0800 Subject: [PATCH] C++-ize backoff --- .../client_channel/lb_policy/grpclb/grpclb.cc | 25 ++-- .../resolver/dns/c_ares/dns_resolver_ares.cc | 24 ++-- .../resolver/dns/native/dns_resolver.cc | 24 ++-- .../ext/filters/client_channel/subchannel.cc | 84 +++++++++-- src/core/lib/backoff/backoff.cc | 83 +++++------ src/core/lib/backoff/backoff.h | 132 ++++++++++-------- src/core/lib/support/alloc_new.h | 30 ++++ test/core/backoff/backoff_test.cc | 90 +++++++----- 8 files changed, 310 insertions(+), 182 deletions(-) create mode 100644 src/core/lib/support/alloc_new.h diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5fb502e2dd9..9404bd70647 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -113,6 +113,7 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" @@ -397,7 +398,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_backoff lb_call_backoff_state; + grpc_core::Backoff* lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -986,6 +987,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); + gpr_free(glb_policy->lb_call_backoff); gpr_free(glb_policy); } @@ -1150,7 +1152,7 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, } glb_policy->started_picking = true; - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(exec_ctx, glb_policy); } @@ -1291,8 +1293,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ grpc_millis next_try = - grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + glb_policy->lb_call_backoff->Step(exec_ctx).next_attempt_start_time; if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); @@ -1461,12 +1462,14 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + glb_policy->lb_call_backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; @@ -1573,7 +1576,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (glb_policy->lb_response_payload != nullptr) { - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 07737b19d23..041e290bb85 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" @@ -89,7 +90,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::Backoff* backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -137,7 +138,7 @@ static void dns_ares_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(exec_ctx, r); } } @@ -272,8 +273,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -307,7 +307,7 @@ static void dns_ares_next_locked(grpc_exec_ctx* exec_ctx, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(exec_ctx, r); } else { dns_ares_maybe_finish_next_locked(exec_ctx, r); @@ -353,6 +353,7 @@ static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r->backoff); gpr_free(r); } @@ -381,11 +382,14 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 589c74807f2..35c909e2a41 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -32,6 +32,7 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -70,7 +71,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::Backoff* backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -113,7 +114,7 @@ static void dns_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(exec_ctx, r); } } @@ -126,7 +127,7 @@ static void dns_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); @@ -170,8 +171,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -233,6 +233,7 @@ static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r->backoff); gpr_free(r); } @@ -257,11 +258,14 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 58e294d597f..a0b441a73ab 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -72,6 +73,9 @@ typedef struct external_state_watcher { } external_state_watcher; struct grpc_subchannel { + grpc_subchannel(const grpc_core::Backoff::Options& backoff_options) + : backoff(backoff_options) {} + grpc_connector* connector; /** refcount @@ -118,8 +122,8 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_backoff backoff_state; - grpc_backoff_result backoff_result; + grpc_core::Backoff backoff; + grpc_core::Backoff::Result backoff_result; /** do we have an active alarm? */ bool have_alarm; @@ -283,6 +287,52 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, } } +static grpc_core::Backoff::Options extract_backoff_options( + const grpc_channel_args* args) { + int initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + int min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; + if (args != nullptr) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, + "grpc.testing.fixed_reconnect_backoff_ms")) { + fixed_reconnect_backoff = true; + initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer(&args->args[i], + {initial_backoff_ms, 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {max_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {initial_backoff_ms, 100, INT_MAX}); + } + } + } + grpc_core::Backoff::Options backoff_options; + backoff_options.set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_min_connect_timeout(min_connect_timeout_ms) + .set_max_backoff(max_backoff_ms); + return backoff_options; +} + grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, grpc_connector* connector, const grpc_subchannel_args* args) { @@ -294,7 +344,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); - c = (grpc_subchannel*)gpr_zalloc(sizeof(*c)); + c = GPR_NEW(grpc_subchannel(extract_backoff_options(args->args))); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; @@ -336,7 +386,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, "subchannel"); int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + int min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (c->args) { @@ -344,14 +395,14 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { fixed_reconnect_backoff = true; - initial_backoff_ms = min_backoff_ms = max_backoff_ms = + initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = grpc_channel_arg_get_integer(&c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; - min_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_backoff_ms, 100, INT_MAX}); + min_connect_timeout_ms = grpc_channel_arg_get_integer( + &c->args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -365,12 +416,15 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } } } - grpc_backoff_init( - &c->backoff_state, initial_backoff_ms, - fixed_reconnect_backoff ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - min_backoff_ms, max_backoff_ms); + grpc_core::Backoff::Options backoff_options; + backoff_options.set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_min_connect_timeout(min_connect_timeout_ms) + .set_max_backoff(max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -429,7 +483,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = grpc_backoff_step(exec_ctx, &c->backoff_state); + c->backoff_result = c->backoff.Step(exec_ctx); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -466,7 +520,7 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = grpc_backoff_begin(exec_ctx, &c->backoff_state); + c->backoff_result = c->backoff.Begin(exec_ctx); continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index dc754ddd821..7376ed6d91e 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -18,63 +18,52 @@ #include "src/core/lib/backoff/backoff.h" +#include +#include + #include -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff) { - backoff->initial_backoff = initial_backoff; - backoff->multiplier = multiplier; - backoff->jitter = jitter; - backoff->min_connect_timeout = min_connect_timeout; - backoff->max_backoff = max_backoff; - backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; -} +namespace grpc_core { -grpc_backoff_result grpc_backoff_begin(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; - const grpc_millis initial_timeout = - GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout); - const grpc_millis now = grpc_exec_ctx_now(exec_ctx); - const grpc_backoff_result result = {now + initial_timeout, - now + backoff->current_backoff}; - return result; +namespace { +static double generate_uniform_random_number_between(double a, double b) { + if (a == b) return a; + if (a > b) GPR_SWAP(double, a, b); // make sure a < b + const double range = b - a; + const double zero_to_one_rand = rand() / (double)RAND_MAX; + return a + zero_to_one_rand * range; } +} // namespace -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(uint32_t* rng_state) { - *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); - return *rng_state / (double)((uint32_t)1 << 31); +Backoff::Backoff(const Options& options) : options_(options) { + seed = (unsigned int)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; } -static double generate_uniform_random_number_between(uint32_t* rng_state, - double a, double b) { - if (a == b) return a; - if (a > b) GPR_SWAP(double, a, b); // make sure a < b - const double range = b - a; - return a + generate_uniform_random_number(rng_state) * range; +Backoff::Result Backoff::Begin(grpc_exec_ctx* exec_ctx) { + current_backoff_ = options_.initial_backoff(); + const grpc_millis initial_timeout = + std::max(options_.initial_backoff(), options_.min_connect_timeout()); + const grpc_millis now = grpc_exec_ctx_now(exec_ctx); + return Backoff::Result{now + initial_timeout, now + current_backoff_}; } -grpc_backoff_result grpc_backoff_step(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff) { - backoff->current_backoff = (grpc_millis)(GPR_MIN( - backoff->current_backoff * backoff->multiplier, backoff->max_backoff)); +Backoff::Result Backoff::Step(grpc_exec_ctx* exec_ctx) { + current_backoff_ = + (grpc_millis)(std::min(current_backoff_ * options_.multiplier(), + (double)options_.max_backoff())); const double jitter = generate_uniform_random_number_between( - &backoff->rng_state, -backoff->jitter * backoff->current_backoff, - backoff->jitter * backoff->current_backoff); - const grpc_millis current_timeout = - GPR_MAX((grpc_millis)(backoff->current_backoff + jitter), - backoff->min_connect_timeout); - const grpc_millis next_timeout = GPR_MIN( - (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff); + -options_.jitter() * current_backoff_, + options_.jitter() * current_backoff_); + const grpc_millis current_timeout = std::max( + (grpc_millis)(current_backoff_ + jitter), options_.min_connect_timeout()); + const grpc_millis next_timeout = std::min( + (grpc_millis)(current_backoff_ + jitter), options_.max_backoff()); const grpc_millis now = grpc_exec_ctx_now(exec_ctx); - const grpc_backoff_result result = {now + current_timeout, - now + next_timeout}; - return result; + return Backoff::Result{now + current_timeout, now + next_timeout}; } -void grpc_backoff_reset(grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; -} +void Backoff::Reset() { current_backoff_ = options_.initial_backoff(); } + +void Backoff::SetRandomSeed(uint32_t seed) { srand(seed); } + +} // namespace grpc_core diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index 1067281403a..ab644b979a6 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -21,63 +21,85 @@ #include "src/core/lib/iomgr/exec_ctx.h" -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - /// const: how long to wait after the first failure before retrying - grpc_millis initial_backoff; - - /// const: factor with which to multiply backoff after a failed retry - double multiplier; - - /// const: amount to randomize backoffs - double jitter; - - /// const: minimum time between retries - grpc_millis min_connect_timeout; - - /// const: maximum time between retries - grpc_millis max_backoff; - +namespace grpc_core { + +class Backoff { + public: + class Options; + struct Result; + + /// Initialize backoff machinery - does not need to be destroyed + explicit Backoff(const Options& options); + + /// Begin retry loop: returns the deadlines to be used for the current attempt + /// and the subsequent retry, if any. + Result Begin(grpc_exec_ctx* exec_ctx); + /// Step a retry loop: returns the deadlines to be used for the current + /// attempt and the subsequent retry, if any. + Result Step(grpc_exec_ctx* exec_ctx); + /// Reset the backoff, so the next grpc_backoff_step will be a + /// grpc_backoff_begin. + void Reset(); + + void SetRandomSeed(unsigned int seed); + + class Options { + public: + Options& set_initial_backoff(grpc_millis initial_backoff) { + initial_backoff_ = initial_backoff; + return *this; + } + Options& set_multiplier(double multiplier) { + multiplier_ = multiplier; + return *this; + } + Options& set_jitter(double jitter) { + jitter_ = jitter; + return *this; + } + Options& set_min_connect_timeout(grpc_millis min_connect_timeout) { + min_connect_timeout_ = min_connect_timeout; + return *this; + } + Options& set_max_backoff(grpc_millis max_backoff) { + max_backoff_ = max_backoff; + return *this; + } + /// how long to wait after the first failure before retrying + grpc_millis initial_backoff() const { return initial_backoff_; } + /// factor with which to multiply backoff after a failed retry + double multiplier() const { return multiplier_; } + /// amount to randomize backoffs + double jitter() const { return jitter_; } + /// minimum time between retries + grpc_millis min_connect_timeout() const { return min_connect_timeout_; } + /// maximum time between retries + grpc_millis max_backoff() const { return max_backoff_; } + + private: + grpc_millis initial_backoff_; + double multiplier_; + double jitter_; + grpc_millis min_connect_timeout_; + grpc_millis max_backoff_; + }; // class Options + + struct Result { + /// Deadline to be used for the current attempt. + grpc_millis current_deadline; + /// Deadline to be used for the next attempt, following the backoff + /// strategy. + grpc_millis next_attempt_start_time; + }; + + private: + const Options options_; /// current delay before retries - grpc_millis current_backoff; - - /// random number generator - uint32_t rng_state; -} grpc_backoff; - -typedef struct { - /// Deadline to be used for the current attempt. - grpc_millis current_deadline; - - /// Deadline to be used for the next attempt, following the backoff strategy. - grpc_millis next_attempt_start_time; -} grpc_backoff_result; - -/// Initialize backoff machinery - does not need to be destroyed -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff); - -/// Begin retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_begin(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff); - -/// Step a retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_step(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff); + grpc_millis current_backoff_; -/// Reset the backoff, so the next grpc_backoff_step will be a -/// grpc_backoff_begin. -void grpc_backoff_reset(grpc_backoff* backoff); + unsigned int seed; +}; -#ifdef __cplusplus -} -#endif +} // namespace grpc_core #endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */ diff --git a/src/core/lib/support/alloc_new.h b/src/core/lib/support/alloc_new.h new file mode 100644 index 00000000000..314c114cd9a --- /dev/null +++ b/src/core/lib/support/alloc_new.h @@ -0,0 +1,30 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SUPPORT_ALLOC_NEW_H +#define GRPC_SUPPORT_ALLOC_NEW_H + +#include + +#define GPR_NEW(expr) new (gpr_zalloc) expr + +inline void* operator new(size_t sz, void* (*alloc_fn)(size_t)) { + return alloc_fn(sz); +} + +#endif /* GRPC_SUPPORT_ALLOC_NEW_H */ diff --git a/test/core/backoff/backoff_test.cc b/test/core/backoff/backoff_test.cc index ef2de8d6388..24635568678 100644 --- a/test/core/backoff/backoff_test.cc +++ b/test/core/backoff/backoff_test.cc @@ -23,24 +23,32 @@ #include "test/core/util/test_config.h" +namespace grpc_core { +namespace { + static void test_constant_backoff(void) { - grpc_backoff backoff; const grpc_millis initial_backoff = 200; const double multiplier = 1.0; const double jitter = 0.0; const grpc_millis min_connect_timeout = 100; const grpc_millis max_backoff = 1000; - grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter, - min_connect_timeout, max_backoff); + Backoff::Options options; + options.set_initial_backoff(initial_backoff) + .set_multiplier(multiplier) + .set_jitter(jitter) + .set_min_connect_timeout(min_connect_timeout) + .set_max_backoff(max_backoff); + Backoff backoff(options); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_backoff_result next_deadlines = grpc_backoff_begin(&exec_ctx, &backoff); + Backoff::Result next_deadlines = backoff.Begin(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) == initial_backoff); GPR_ASSERT(next_deadlines.next_attempt_start_time - grpc_exec_ctx_now(&exec_ctx) == initial_backoff); for (int i = 0; i < 10000; i++) { - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) == initial_backoff); GPR_ASSERT(next_deadlines.next_attempt_start_time - @@ -52,16 +60,20 @@ static void test_constant_backoff(void) { } static void test_min_connect(void) { - grpc_backoff backoff; const grpc_millis initial_backoff = 100; const double multiplier = 1.0; const double jitter = 0.0; const grpc_millis min_connect_timeout = 200; const grpc_millis max_backoff = 1000; - grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter, - min_connect_timeout, max_backoff); + Backoff::Options options; + options.set_initial_backoff(initial_backoff) + .set_multiplier(multiplier) + .set_jitter(jitter) + .set_min_connect_timeout(min_connect_timeout) + .set_max_backoff(max_backoff); + Backoff backoff(options); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_backoff_result next = grpc_backoff_begin(&exec_ctx, &backoff); + Backoff::Result next = backoff.Begin(&exec_ctx); // Because the min_connect_timeout > initial_backoff, current_deadline is used // as the deadline for the current attempt. GPR_ASSERT(next.current_deadline - grpc_exec_ctx_now(&exec_ctx) == @@ -74,57 +86,61 @@ static void test_min_connect(void) { } static void test_no_jitter_backoff(void) { - grpc_backoff backoff; const grpc_millis initial_backoff = 2; const double multiplier = 2.0; const double jitter = 0.0; const grpc_millis min_connect_timeout = 1; const grpc_millis max_backoff = 513; - grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter, - min_connect_timeout, max_backoff); + Backoff::Options options; + options.set_initial_backoff(initial_backoff) + .set_multiplier(multiplier) + .set_jitter(jitter) + .set_min_connect_timeout(min_connect_timeout) + .set_max_backoff(max_backoff); + Backoff backoff(options); // x_1 = 2 // x_n = 2**i + x_{i-1} ( = 2**(n+1) - 2 ) grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; exec_ctx.now = 0; exec_ctx.now_is_valid = true; - grpc_backoff_result next_deadlines = grpc_backoff_begin(&exec_ctx, &backoff); + Backoff::Result next_deadlines = backoff.Begin(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == next_deadlines.next_attempt_start_time); GPR_ASSERT(next_deadlines.current_deadline == 2); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 6); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 14); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 30); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 62); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 126); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 254); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 510); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 1022); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); // Hit the maximum timeout. From this point onwards, retries will increase // only by max timeout. GPR_ASSERT(next_deadlines.current_deadline == 1535); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 2048); exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline == 2561); grpc_exec_ctx_finish(&exec_ctx); } @@ -136,14 +152,18 @@ static void test_jitter_backoff(void) { const grpc_millis min_connect_timeout = 100; const double multiplier = 1.0; const double jitter = 0.1; - grpc_backoff backoff; - grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter, - min_connect_timeout, max_backoff); + Backoff::Options options; + options.set_initial_backoff(initial_backoff) + .set_multiplier(multiplier) + .set_jitter(jitter) + .set_min_connect_timeout(min_connect_timeout) + .set_max_backoff(max_backoff); + Backoff backoff(options); - backoff.rng_state = 0; // force consistent PRNG + backoff.SetRandomSeed(0); // force consistent PRNG grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_backoff_result next_deadlines = grpc_backoff_begin(&exec_ctx, &backoff); + Backoff::Result next_deadlines = backoff.Begin(&exec_ctx); GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) == initial_backoff); GPR_ASSERT(next_deadlines.next_attempt_start_time - @@ -156,7 +176,7 @@ static void test_jitter_backoff(void) { (grpc_millis)((double)current_backoff * (1 + jitter)); for (int i = 0; i < 10000; i++) { - next_deadlines = grpc_backoff_step(&exec_ctx, &backoff); + next_deadlines = backoff.Step(&exec_ctx); // next-now must be within (jitter*100)% of the current backoff (which // increases by * multiplier up to max_backoff). const grpc_millis timeout_millis = @@ -173,15 +193,17 @@ static void test_jitter_backoff(void) { } grpc_exec_ctx_finish(&exec_ctx); } +} // namespace +} // namespace grpc_core int main(int argc, char** argv) { grpc_test_init(argc, argv); gpr_time_init(); - test_constant_backoff(); - test_min_connect(); - test_no_jitter_backoff(); - test_jitter_backoff(); + grpc_core::test_constant_backoff(); + grpc_core::test_min_connect(); + grpc_core::test_no_jitter_backoff(); + grpc_core::test_jitter_backoff(); return 0; }