C++-ize backoff

reviewable/pr13494/r1
David Garcia Quintas 7 years ago
parent 070a14f0cd
commit dde6afc0c0
  1. 25
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 24
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  3. 24
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  4. 84
      src/core/ext/filters/client_channel/subchannel.cc
  5. 83
      src/core/lib/backoff/backoff.cc
  6. 132
      src/core/lib/backoff/backoff.h
  7. 30
      src/core/lib/support/alloc_new.h
  8. 90
      test/core/backoff/backoff_test.cc

@ -113,6 +113,7 @@
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/alloc_new.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
@ -397,7 +398,7 @@ typedef struct glb_lb_policy {
grpc_slice lb_call_status_details;
/** LB call retry backoff state */
grpc_backoff lb_call_backoff_state;
grpc_core::Backoff* lb_call_backoff;
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
@ -986,6 +987,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
gpr_free(glb_policy->lb_call_backoff);
gpr_free(glb_policy);
}
@ -1150,7 +1152,7 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
}
glb_policy->started_picking = true;
grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(exec_ctx, glb_policy);
}
@ -1291,8 +1293,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx,
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
grpc_millis next_try =
grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state)
.next_attempt_start_time;
glb_policy->lb_call_backoff->Step(exec_ctx).next_attempt_start_time;
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
@ -1461,12 +1462,14 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx,
lb_on_response_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
GRPC_GRPCLB_RECONNECT_JITTER,
GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_core::Backoff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_min_connect_timeout(GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
glb_policy->lb_call_backoff = GPR_NEW(grpc_core::Backoff(backoff_options));
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
@ -1573,7 +1576,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
if (glb_policy->lb_response_payload != nullptr) {
grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
glb_policy->lb_call_backoff->Reset();
/* Received data from the LB server. Look inside
* glb_policy->lb_response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;

@ -39,6 +39,7 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/support/alloc_new.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/service_config.h"
@ -89,7 +90,7 @@ typedef struct {
bool have_retry_timer;
grpc_timer retry_timer;
/** retry backoff state */
grpc_backoff backoff_state;
grpc_core::Backoff* backoff;
/** currently resolving addresses */
grpc_lb_addresses* lb_addresses;
@ -137,7 +138,7 @@ static void dns_ares_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
if (!r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_ares_start_resolving_locked(exec_ctx, r);
}
}
@ -272,8 +273,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg,
} else {
const char* msg = grpc_error_string(error);
gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
grpc_millis next_try =
grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time;
grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time;
grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@ -307,7 +307,7 @@ static void dns_ares_next_locked(grpc_exec_ctx* exec_ctx,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_ares_start_resolving_locked(exec_ctx, r);
} else {
dns_ares_maybe_finish_next_locked(exec_ctx, r);
@ -353,6 +353,7 @@ static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r->backoff);
gpr_free(r);
}
@ -381,11 +382,14 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx,
grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties,
args->pollset_set);
}
grpc_backoff_init(
&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_core::Backoff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options));
GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
grpc_combiner_scheduler(r->base.combiner));

@ -32,6 +32,7 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/alloc_new.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
@ -70,7 +71,7 @@ typedef struct {
grpc_timer retry_timer;
grpc_closure on_retry;
/** retry backoff state */
grpc_backoff backoff_state;
grpc_core::Backoff* backoff;
/** currently resolving addresses */
grpc_resolved_addresses* addresses;
@ -113,7 +114,7 @@ static void dns_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
if (!r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_start_resolving_locked(exec_ctx, r);
}
}
@ -126,7 +127,7 @@ static void dns_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_start_resolving_locked(exec_ctx, r);
} else {
dns_maybe_finish_next_locked(exec_ctx, r);
@ -170,8 +171,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(exec_ctx, addresses);
} else {
grpc_millis next_try =
grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time;
grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time;
grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@ -233,6 +233,7 @@ static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r->backoff);
gpr_free(r);
}
@ -257,11 +258,14 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx,
grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties,
args->pollset_set);
}
grpc_backoff_init(
&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_core::Backoff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options));
return &r->base;
}

@ -39,6 +39,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/alloc_new.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -72,6 +73,9 @@ typedef struct external_state_watcher {
} external_state_watcher;
struct grpc_subchannel {
grpc_subchannel(const grpc_core::Backoff::Options& backoff_options)
: backoff(backoff_options) {}
grpc_connector* connector;
/** refcount
@ -118,8 +122,8 @@ struct grpc_subchannel {
external_state_watcher root_external_state_watcher;
/** backoff state */
grpc_backoff backoff_state;
grpc_backoff_result backoff_result;
grpc_core::Backoff backoff;
grpc_core::Backoff::Result backoff_result;
/** do we have an active alarm? */
bool have_alarm;
@ -283,6 +287,52 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx,
}
}
static grpc_core::Backoff::Options extract_backoff_options(
const grpc_channel_args* args) {
int initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
int min_connect_timeout_ms =
GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (args != nullptr) {
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key,
"grpc.testing.fixed_reconnect_backoff_ms")) {
fixed_reconnect_backoff = true;
initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms =
grpc_channel_arg_get_integer(&args->args[i],
{initial_backoff_ms, 100, INT_MAX});
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
min_connect_timeout_ms = grpc_channel_arg_get_integer(
&args->args[i], {min_connect_timeout_ms, 100, INT_MAX});
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
max_backoff_ms = grpc_channel_arg_get_integer(
&args->args[i], {max_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
initial_backoff_ms = grpc_channel_arg_get_integer(
&args->args[i], {initial_backoff_ms, 100, INT_MAX});
}
}
}
grpc_core::Backoff::Options backoff_options;
backoff_options.set_initial_backoff(initial_backoff_ms)
.set_multiplier(fixed_reconnect_backoff
? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(fixed_reconnect_backoff ? 0.0
: GRPC_SUBCHANNEL_RECONNECT_JITTER)
.set_min_connect_timeout(min_connect_timeout_ms)
.set_max_backoff(max_backoff_ms);
return backoff_options;
}
grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
grpc_connector* connector,
const grpc_subchannel_args* args) {
@ -294,7 +344,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
}
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx);
c = (grpc_subchannel*)gpr_zalloc(sizeof(*c));
c = GPR_NEW(grpc_subchannel(extract_backoff_options(args->args)));
c->key = key;
gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
c->connector = connector;
@ -336,7 +386,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
"subchannel");
int initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
int min_connect_timeout_ms =
GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (c->args) {
@ -344,14 +395,14 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff_ms")) {
fixed_reconnect_backoff = true;
initial_backoff_ms = min_backoff_ms = max_backoff_ms =
initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms =
grpc_channel_arg_get_integer(&c->args->args[i],
{initial_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
min_backoff_ms = grpc_channel_arg_get_integer(
&c->args->args[i], {min_backoff_ms, 100, INT_MAX});
min_connect_timeout_ms = grpc_channel_arg_get_integer(
&c->args->args[i], {min_connect_timeout_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
@ -365,12 +416,15 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
}
}
}
grpc_backoff_init(
&c->backoff_state, initial_backoff_ms,
fixed_reconnect_backoff ? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
min_backoff_ms, max_backoff_ms);
grpc_core::Backoff::Options backoff_options;
backoff_options.set_initial_backoff(initial_backoff_ms)
.set_multiplier(fixed_reconnect_backoff
? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(fixed_reconnect_backoff ? 0.0
: GRPC_SUBCHANNEL_RECONNECT_JITTER)
.set_min_connect_timeout(min_connect_timeout_ms)
.set_max_backoff(max_backoff_ms);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c);
@ -429,7 +483,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->backoff_result = grpc_backoff_step(exec_ctx, &c->backoff_state);
c->backoff_result = c->backoff.Step(exec_ctx);
continue_connect_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
} else {
@ -466,7 +520,7 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx,
if (!c->backoff_begun) {
c->backoff_begun = true;
c->backoff_result = grpc_backoff_begin(exec_ctx, &c->backoff_state);
c->backoff_result = c->backoff.Begin(exec_ctx);
continue_connect_locked(exec_ctx, c);
} else {
GPR_ASSERT(!c->have_alarm);

@ -18,63 +18,52 @@
#include "src/core/lib/backoff/backoff.h"
#include <algorithm>
#include <cstdlib>
#include <grpc/support/useful.h>
void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
double multiplier, double jitter,
grpc_millis min_connect_timeout,
grpc_millis max_backoff) {
backoff->initial_backoff = initial_backoff;
backoff->multiplier = multiplier;
backoff->jitter = jitter;
backoff->min_connect_timeout = min_connect_timeout;
backoff->max_backoff = max_backoff;
backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec;
}
namespace grpc_core {
grpc_backoff_result grpc_backoff_begin(grpc_exec_ctx* exec_ctx,
grpc_backoff* backoff) {
backoff->current_backoff = backoff->initial_backoff;
const grpc_millis initial_timeout =
GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout);
const grpc_millis now = grpc_exec_ctx_now(exec_ctx);
const grpc_backoff_result result = {now + initial_timeout,
now + backoff->current_backoff};
return result;
namespace {
static double generate_uniform_random_number_between(double a, double b) {
if (a == b) return a;
if (a > b) GPR_SWAP(double, a, b); // make sure a < b
const double range = b - a;
const double zero_to_one_rand = rand() / (double)RAND_MAX;
return a + zero_to_one_rand * range;
}
} // namespace
/* Generate a random number between 0 and 1. */
static double generate_uniform_random_number(uint32_t* rng_state) {
*rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31);
return *rng_state / (double)((uint32_t)1 << 31);
Backoff::Backoff(const Options& options) : options_(options) {
seed = (unsigned int)gpr_now(GPR_CLOCK_REALTIME).tv_nsec;
}
static double generate_uniform_random_number_between(uint32_t* rng_state,
double a, double b) {
if (a == b) return a;
if (a > b) GPR_SWAP(double, a, b); // make sure a < b
const double range = b - a;
return a + generate_uniform_random_number(rng_state) * range;
Backoff::Result Backoff::Begin(grpc_exec_ctx* exec_ctx) {
current_backoff_ = options_.initial_backoff();
const grpc_millis initial_timeout =
std::max(options_.initial_backoff(), options_.min_connect_timeout());
const grpc_millis now = grpc_exec_ctx_now(exec_ctx);
return Backoff::Result{now + initial_timeout, now + current_backoff_};
}
grpc_backoff_result grpc_backoff_step(grpc_exec_ctx* exec_ctx,
grpc_backoff* backoff) {
backoff->current_backoff = (grpc_millis)(GPR_MIN(
backoff->current_backoff * backoff->multiplier, backoff->max_backoff));
Backoff::Result Backoff::Step(grpc_exec_ctx* exec_ctx) {
current_backoff_ =
(grpc_millis)(std::min(current_backoff_ * options_.multiplier(),
(double)options_.max_backoff()));
const double jitter = generate_uniform_random_number_between(
&backoff->rng_state, -backoff->jitter * backoff->current_backoff,
backoff->jitter * backoff->current_backoff);
const grpc_millis current_timeout =
GPR_MAX((grpc_millis)(backoff->current_backoff + jitter),
backoff->min_connect_timeout);
const grpc_millis next_timeout = GPR_MIN(
(grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff);
-options_.jitter() * current_backoff_,
options_.jitter() * current_backoff_);
const grpc_millis current_timeout = std::max(
(grpc_millis)(current_backoff_ + jitter), options_.min_connect_timeout());
const grpc_millis next_timeout = std::min(
(grpc_millis)(current_backoff_ + jitter), options_.max_backoff());
const grpc_millis now = grpc_exec_ctx_now(exec_ctx);
const grpc_backoff_result result = {now + current_timeout,
now + next_timeout};
return result;
return Backoff::Result{now + current_timeout, now + next_timeout};
}
void grpc_backoff_reset(grpc_backoff* backoff) {
backoff->current_backoff = backoff->initial_backoff;
}
void Backoff::Reset() { current_backoff_ = options_.initial_backoff(); }
void Backoff::SetRandomSeed(uint32_t seed) { srand(seed); }
} // namespace grpc_core

@ -21,63 +21,85 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
/// const: how long to wait after the first failure before retrying
grpc_millis initial_backoff;
/// const: factor with which to multiply backoff after a failed retry
double multiplier;
/// const: amount to randomize backoffs
double jitter;
/// const: minimum time between retries
grpc_millis min_connect_timeout;
/// const: maximum time between retries
grpc_millis max_backoff;
namespace grpc_core {
class Backoff {
public:
class Options;
struct Result;
/// Initialize backoff machinery - does not need to be destroyed
explicit Backoff(const Options& options);
/// Begin retry loop: returns the deadlines to be used for the current attempt
/// and the subsequent retry, if any.
Result Begin(grpc_exec_ctx* exec_ctx);
/// Step a retry loop: returns the deadlines to be used for the current
/// attempt and the subsequent retry, if any.
Result Step(grpc_exec_ctx* exec_ctx);
/// Reset the backoff, so the next grpc_backoff_step will be a
/// grpc_backoff_begin.
void Reset();
void SetRandomSeed(unsigned int seed);
class Options {
public:
Options& set_initial_backoff(grpc_millis initial_backoff) {
initial_backoff_ = initial_backoff;
return *this;
}
Options& set_multiplier(double multiplier) {
multiplier_ = multiplier;
return *this;
}
Options& set_jitter(double jitter) {
jitter_ = jitter;
return *this;
}
Options& set_min_connect_timeout(grpc_millis min_connect_timeout) {
min_connect_timeout_ = min_connect_timeout;
return *this;
}
Options& set_max_backoff(grpc_millis max_backoff) {
max_backoff_ = max_backoff;
return *this;
}
/// how long to wait after the first failure before retrying
grpc_millis initial_backoff() const { return initial_backoff_; }
/// factor with which to multiply backoff after a failed retry
double multiplier() const { return multiplier_; }
/// amount to randomize backoffs
double jitter() const { return jitter_; }
/// minimum time between retries
grpc_millis min_connect_timeout() const { return min_connect_timeout_; }
/// maximum time between retries
grpc_millis max_backoff() const { return max_backoff_; }
private:
grpc_millis initial_backoff_;
double multiplier_;
double jitter_;
grpc_millis min_connect_timeout_;
grpc_millis max_backoff_;
}; // class Options
struct Result {
/// Deadline to be used for the current attempt.
grpc_millis current_deadline;
/// Deadline to be used for the next attempt, following the backoff
/// strategy.
grpc_millis next_attempt_start_time;
};
private:
const Options options_;
/// current delay before retries
grpc_millis current_backoff;
/// random number generator
uint32_t rng_state;
} grpc_backoff;
typedef struct {
/// Deadline to be used for the current attempt.
grpc_millis current_deadline;
/// Deadline to be used for the next attempt, following the backoff strategy.
grpc_millis next_attempt_start_time;
} grpc_backoff_result;
/// Initialize backoff machinery - does not need to be destroyed
void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
double multiplier, double jitter,
grpc_millis min_connect_timeout,
grpc_millis max_backoff);
/// Begin retry loop: returns the deadlines to be used for the current attempt
/// and the subsequent retry, if any.
grpc_backoff_result grpc_backoff_begin(grpc_exec_ctx* exec_ctx,
grpc_backoff* backoff);
/// Step a retry loop: returns the deadlines to be used for the current attempt
/// and the subsequent retry, if any.
grpc_backoff_result grpc_backoff_step(grpc_exec_ctx* exec_ctx,
grpc_backoff* backoff);
grpc_millis current_backoff_;
/// Reset the backoff, so the next grpc_backoff_step will be a
/// grpc_backoff_begin.
void grpc_backoff_reset(grpc_backoff* backoff);
unsigned int seed;
};
#ifdef __cplusplus
}
#endif
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */

@ -0,0 +1,30 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_SUPPORT_ALLOC_NEW_H
#define GRPC_SUPPORT_ALLOC_NEW_H
#include <grpc/support/alloc.h>
#define GPR_NEW(expr) new (gpr_zalloc) expr
inline void* operator new(size_t sz, void* (*alloc_fn)(size_t)) {
return alloc_fn(sz);
}
#endif /* GRPC_SUPPORT_ALLOC_NEW_H */

@ -23,24 +23,32 @@
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace {
static void test_constant_backoff(void) {
grpc_backoff backoff;
const grpc_millis initial_backoff = 200;
const double multiplier = 1.0;
const double jitter = 0.0;
const grpc_millis min_connect_timeout = 100;
const grpc_millis max_backoff = 1000;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
Backoff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_min_connect_timeout(min_connect_timeout)
.set_max_backoff(max_backoff);
Backoff backoff(options);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_backoff_result next_deadlines = grpc_backoff_begin(&exec_ctx, &backoff);
Backoff::Result next_deadlines = backoff.Begin(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) ==
initial_backoff);
GPR_ASSERT(next_deadlines.next_attempt_start_time -
grpc_exec_ctx_now(&exec_ctx) ==
initial_backoff);
for (int i = 0; i < 10000; i++) {
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) ==
initial_backoff);
GPR_ASSERT(next_deadlines.next_attempt_start_time -
@ -52,16 +60,20 @@ static void test_constant_backoff(void) {
}
static void test_min_connect(void) {
grpc_backoff backoff;
const grpc_millis initial_backoff = 100;
const double multiplier = 1.0;
const double jitter = 0.0;
const grpc_millis min_connect_timeout = 200;
const grpc_millis max_backoff = 1000;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
Backoff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_min_connect_timeout(min_connect_timeout)
.set_max_backoff(max_backoff);
Backoff backoff(options);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_backoff_result next = grpc_backoff_begin(&exec_ctx, &backoff);
Backoff::Result next = backoff.Begin(&exec_ctx);
// Because the min_connect_timeout > initial_backoff, current_deadline is used
// as the deadline for the current attempt.
GPR_ASSERT(next.current_deadline - grpc_exec_ctx_now(&exec_ctx) ==
@ -74,57 +86,61 @@ static void test_min_connect(void) {
}
static void test_no_jitter_backoff(void) {
grpc_backoff backoff;
const grpc_millis initial_backoff = 2;
const double multiplier = 2.0;
const double jitter = 0.0;
const grpc_millis min_connect_timeout = 1;
const grpc_millis max_backoff = 513;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
Backoff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_min_connect_timeout(min_connect_timeout)
.set_max_backoff(max_backoff);
Backoff backoff(options);
// x_1 = 2
// x_n = 2**i + x_{i-1} ( = 2**(n+1) - 2 )
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
exec_ctx.now = 0;
exec_ctx.now_is_valid = true;
grpc_backoff_result next_deadlines = grpc_backoff_begin(&exec_ctx, &backoff);
Backoff::Result next_deadlines = backoff.Begin(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline ==
next_deadlines.next_attempt_start_time);
GPR_ASSERT(next_deadlines.current_deadline == 2);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 6);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 14);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 30);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 62);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 126);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 254);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 510);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 1022);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
// Hit the maximum timeout. From this point onwards, retries will increase
// only by max timeout.
GPR_ASSERT(next_deadlines.current_deadline == 1535);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 2048);
exec_ctx.now = next_deadlines.current_deadline;
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline == 2561);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -136,14 +152,18 @@ static void test_jitter_backoff(void) {
const grpc_millis min_connect_timeout = 100;
const double multiplier = 1.0;
const double jitter = 0.1;
grpc_backoff backoff;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
Backoff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_min_connect_timeout(min_connect_timeout)
.set_max_backoff(max_backoff);
Backoff backoff(options);
backoff.rng_state = 0; // force consistent PRNG
backoff.SetRandomSeed(0); // force consistent PRNG
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_backoff_result next_deadlines = grpc_backoff_begin(&exec_ctx, &backoff);
Backoff::Result next_deadlines = backoff.Begin(&exec_ctx);
GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) ==
initial_backoff);
GPR_ASSERT(next_deadlines.next_attempt_start_time -
@ -156,7 +176,7 @@ static void test_jitter_backoff(void) {
(grpc_millis)((double)current_backoff * (1 + jitter));
for (int i = 0; i < 10000; i++) {
next_deadlines = grpc_backoff_step(&exec_ctx, &backoff);
next_deadlines = backoff.Step(&exec_ctx);
// next-now must be within (jitter*100)% of the current backoff (which
// increases by * multiplier up to max_backoff).
const grpc_millis timeout_millis =
@ -173,15 +193,17 @@ static void test_jitter_backoff(void) {
}
grpc_exec_ctx_finish(&exec_ctx);
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
gpr_time_init();
test_constant_backoff();
test_min_connect();
test_no_jitter_backoff();
test_jitter_backoff();
grpc_core::test_constant_backoff();
grpc_core::test_min_connect();
grpc_core::test_no_jitter_backoff();
grpc_core::test_jitter_backoff();
return 0;
}

Loading…
Cancel
Save