Merge pull request #13494 from dgquintas/backoff_cpp

C++-ize backoff
pull/13831/head
David G. Quintas 7 years ago committed by GitHub
commit e9b0fd07a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 69
      CMakeLists.txt
  2. 84
      Makefile
  3. 22
      build.yaml
  4. 25
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 23
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  6. 23
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  7. 117
      src/core/ext/filters/client_channel/subchannel.cc
  8. 78
      src/core/lib/backoff/backoff.cc
  9. 110
      src/core/lib/backoff/backoff.h
  10. 8
      src/core/lib/iomgr/tcp_client_posix.cc
  11. 7
      test/core/backoff/BUILD
  12. 202
      test/core/backoff/backoff_test.cc
  13. 115
      test/cpp/end2end/client_lb_end2end_test.cc
  14. 34
      tools/run_tests/generated/sources_and_headers.json
  15. 48
      tools/run_tests/generated/tests.json

@ -381,7 +381,6 @@ add_dependencies(buildtests_c algorithm_test)
add_dependencies(buildtests_c alloc_test)
add_dependencies(buildtests_c alpn_test)
add_dependencies(buildtests_c arena_test)
add_dependencies(buildtests_c backoff_test)
add_dependencies(buildtests_c bad_server_response_test)
add_dependencies(buildtests_c bin_decoder_test)
add_dependencies(buildtests_c bin_encoder_test)
@ -634,6 +633,7 @@ add_custom_target(buildtests_cxx)
add_dependencies(buildtests_cxx alarm_cpp_test)
add_dependencies(buildtests_cxx async_end2end_test)
add_dependencies(buildtests_cxx auth_property_iterator_test)
add_dependencies(buildtests_cxx backoff_test)
add_dependencies(buildtests_cxx bdp_estimator_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_arena)
@ -5110,35 +5110,6 @@ target_link_libraries(arena_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(backoff_test
test/core/backoff/backoff_test.cc
)
target_include_directories(backoff_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${PROTOBUF_ROOT_DIR}/src
PRIVATE ${BENCHMARK_ROOT_DIR}/include
PRIVATE ${ZLIB_ROOT_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
PRIVATE ${CARES_INCLUDE_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
)
target_link_libraries(backoff_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(bad_server_response_test
test/core/end2end/bad_server_response_test.cc
)
@ -8950,6 +8921,44 @@ target_link_libraries(auth_property_iterator_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(backoff_test
test/core/backoff/backoff_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(backoff_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${PROTOBUF_ROOT_DIR}/src
PRIVATE ${BENCHMARK_ROOT_DIR}/include
PRIVATE ${ZLIB_ROOT_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
PRIVATE ${CARES_INCLUDE_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(backoff_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(bdp_estimator_test
test/core/transport/bdp_estimator_test.cc
third_party/googletest/googletest/src/gtest-all.cc

@ -951,7 +951,6 @@ alloc_test: $(BINDIR)/$(CONFIG)/alloc_test
alpn_test: $(BINDIR)/$(CONFIG)/alpn_test
api_fuzzer: $(BINDIR)/$(CONFIG)/api_fuzzer
arena_test: $(BINDIR)/$(CONFIG)/arena_test
backoff_test: $(BINDIR)/$(CONFIG)/backoff_test
bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test
bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
@ -1094,6 +1093,7 @@ wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test
alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test
backoff_test: $(BINDIR)/$(CONFIG)/backoff_test
bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test
bm_arena: $(BINDIR)/$(CONFIG)/bm_arena
bm_call_create: $(BINDIR)/$(CONFIG)/bm_call_create
@ -1352,7 +1352,6 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/alloc_test \
$(BINDIR)/$(CONFIG)/alpn_test \
$(BINDIR)/$(CONFIG)/arena_test \
$(BINDIR)/$(CONFIG)/backoff_test \
$(BINDIR)/$(CONFIG)/bad_server_response_test \
$(BINDIR)/$(CONFIG)/bin_decoder_test \
$(BINDIR)/$(CONFIG)/bin_encoder_test \
@ -1541,6 +1540,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/alarm_cpp_test \
$(BINDIR)/$(CONFIG)/async_end2end_test \
$(BINDIR)/$(CONFIG)/auth_property_iterator_test \
$(BINDIR)/$(CONFIG)/backoff_test \
$(BINDIR)/$(CONFIG)/bdp_estimator_test \
$(BINDIR)/$(CONFIG)/bm_arena \
$(BINDIR)/$(CONFIG)/bm_call_create \
@ -1669,6 +1669,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/alarm_cpp_test \
$(BINDIR)/$(CONFIG)/async_end2end_test \
$(BINDIR)/$(CONFIG)/auth_property_iterator_test \
$(BINDIR)/$(CONFIG)/backoff_test \
$(BINDIR)/$(CONFIG)/bdp_estimator_test \
$(BINDIR)/$(CONFIG)/bm_arena \
$(BINDIR)/$(CONFIG)/bm_call_create \
@ -1772,8 +1773,6 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/alpn_test || ( echo test alpn_test failed ; exit 1 )
$(E) "[RUN] Testing arena_test"
$(Q) $(BINDIR)/$(CONFIG)/arena_test || ( echo test arena_test failed ; exit 1 )
$(E) "[RUN] Testing backoff_test"
$(Q) $(BINDIR)/$(CONFIG)/backoff_test || ( echo test backoff_test failed ; exit 1 )
$(E) "[RUN] Testing bad_server_response_test"
$(Q) $(BINDIR)/$(CONFIG)/bad_server_response_test || ( echo test bad_server_response_test failed ; exit 1 )
$(E) "[RUN] Testing bin_decoder_test"
@ -2038,6 +2037,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing auth_property_iterator_test"
$(Q) $(BINDIR)/$(CONFIG)/auth_property_iterator_test || ( echo test auth_property_iterator_test failed ; exit 1 )
$(E) "[RUN] Testing backoff_test"
$(Q) $(BINDIR)/$(CONFIG)/backoff_test || ( echo test backoff_test failed ; exit 1 )
$(E) "[RUN] Testing bdp_estimator_test"
$(Q) $(BINDIR)/$(CONFIG)/bdp_estimator_test || ( echo test bdp_estimator_test failed ; exit 1 )
$(E) "[RUN] Testing bm_arena"
@ -8912,38 +8913,6 @@ endif
endif
BACKOFF_TEST_SRC = \
test/core/backoff/backoff_test.cc \
BACKOFF_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BACKOFF_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/backoff_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/backoff_test: $(BACKOFF_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(BACKOFF_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/backoff_test
endif
$(OBJDIR)/$(CONFIG)/test/core/backoff/backoff_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_backoff_test: $(BACKOFF_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BACKOFF_TEST_OBJS:.o=.dep)
endif
endif
BAD_SERVER_RESPONSE_TEST_SRC = \
test/core/end2end/bad_server_response_test.cc \
@ -13533,6 +13502,49 @@ endif
endif
BACKOFF_TEST_SRC = \
test/core/backoff/backoff_test.cc \
BACKOFF_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BACKOFF_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/backoff_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/backoff_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/backoff_test: $(PROTOBUF_DEP) $(BACKOFF_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(BACKOFF_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/backoff_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/backoff/backoff_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_backoff_test: $(BACKOFF_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BACKOFF_TEST_OBJS:.o=.dep)
endif
endif
BDP_ESTIMATOR_TEST_SRC = \
test/core/transport/bdp_estimator_test.cc \

@ -1769,17 +1769,6 @@ targets:
- gpr_test_util
- gpr
uses_polling: false
- name: backoff_test
build: test
language: c
src:
- test/core/backoff/backoff_test.cc
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
uses_polling: false
- name: bad_server_response_test
build: test
language: c
@ -3457,6 +3446,17 @@ targets:
- gpr_test_util
- gpr
uses_polling: false
- name: backoff_test
build: test
language: c++
src:
- test/core/backoff/backoff_test.cc
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
uses_polling: false
- name: bdp_estimator_test
build: test
language: c++

@ -113,13 +113,13 @@
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
@ -408,7 +408,7 @@ typedef struct glb_lb_policy {
grpc_slice lb_call_status_details;
/** LB call retry backoff state */
grpc_backoff lb_call_backoff_state;
grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
@ -1167,7 +1167,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
}
glb_policy->started_picking = true;
grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
}
@ -1302,8 +1302,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state)
.next_attempt_start_time;
grpc_millis next_try = glb_policy->lb_call_backoff->Step();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
@ -1463,12 +1462,14 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
lb_on_response_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
GRPC_GRPCLB_RECONNECT_JITTER,
GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_core::BackOff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
glb_policy->lb_call_backoff.Init(backoff_options);
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
@ -1572,7 +1573,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
if (glb_policy->lb_response_payload != nullptr) {
grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
glb_policy->lb_call_backoff->Reset();
/* Received data from the LB server. Look inside
* glb_policy->lb_response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;

@ -40,10 +40,10 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/service_config.h"
#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@ -89,7 +89,7 @@ typedef struct {
bool have_retry_timer;
grpc_timer retry_timer;
/** retry backoff state */
grpc_backoff backoff_state;
grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_lb_addresses* lb_addresses;
@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) {
static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
if (!r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_ares_start_resolving_locked(r);
}
}
@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) {
} else {
const char* msg = grpc_error_string(error);
gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
grpc_millis next_try =
grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
grpc_millis next_try = r->backoff->Step();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_ares_start_resolving_locked(r);
} else {
dns_ares_maybe_finish_next_locked(r);
@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
grpc_backoff_init(
&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_core::BackOff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
r->backoff.Init(grpc_core::BackOff(backoff_options));
GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
grpc_combiner_scheduler(r->base.combiner));

@ -33,9 +33,9 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@ -70,7 +70,7 @@ typedef struct {
grpc_timer retry_timer;
grpc_closure on_retry;
/** retry backoff state */
grpc_backoff backoff_state;
grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_resolved_addresses* addresses;
@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) {
static void dns_channel_saw_error_locked(grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
if (!r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_start_resolving_locked(r);
}
}
@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
grpc_backoff_reset(&r->backoff_state);
r->backoff->Reset();
dns_start_resolving_locked(r);
} else {
dns_maybe_finish_next_locked(r);
@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) {
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(addresses);
} else {
grpc_millis next_try =
grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
grpc_millis next_try = r->backoff->Step();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
grpc_backoff_init(
&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_core::BackOff::Options backoff_options;
backoff_options
.set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
.set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
r->backoff.Init(grpc_core::BackOff(backoff_options));
return &r->base;
}

@ -20,7 +20,9 @@
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include <algorithm>
#include <cstring>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
@ -39,6 +41,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -48,7 +51,7 @@
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20
#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@ -118,8 +121,9 @@ struct grpc_subchannel {
external_state_watcher root_external_state_watcher;
/** backoff state */
grpc_backoff backoff_state;
grpc_backoff_result backoff_result;
grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
grpc_millis next_attempt_deadline;
grpc_millis min_connect_timeout_ms;
/** do we have an active alarm? */
bool have_alarm;
@ -274,6 +278,54 @@ void grpc_subchannel_weak_unref(
}
}
static void parse_args_for_backoff_values(
const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options,
grpc_millis* min_connect_timeout_ms) {
grpc_millis initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
*min_connect_timeout_ms =
GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
grpc_millis max_backoff_ms =
GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (args != nullptr) {
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key,
"grpc.testing.fixed_reconnect_backoff_ms")) {
fixed_reconnect_backoff = true;
initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
grpc_channel_arg_get_integer(
&args->args[i],
{static_cast<int>(initial_backoff_ms), 100, INT_MAX});
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
*min_connect_timeout_ms = grpc_channel_arg_get_integer(
&args->args[i],
{static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
max_backoff_ms = grpc_channel_arg_get_integer(
&args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
initial_backoff_ms = grpc_channel_arg_get_integer(
&args->args[i],
{static_cast<int>(initial_backoff_ms), 100, INT_MAX});
}
}
}
backoff_options->set_initial_backoff(initial_backoff_ms)
.set_multiplier(fixed_reconnect_backoff
? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(fixed_reconnect_backoff ? 0.0
: GRPC_SUBCHANNEL_RECONNECT_JITTER)
.set_max_backoff(max_backoff_ms);
}
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_subchannel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
@ -324,43 +376,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
int initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff_ms")) {
fixed_reconnect_backoff = true;
initial_backoff_ms = min_backoff_ms = max_backoff_ms =
grpc_channel_arg_get_integer(&c->args->args[i],
{initial_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
min_backoff_ms = grpc_channel_arg_get_integer(
&c->args->args[i], {min_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
max_backoff_ms = grpc_channel_arg_get_integer(
&c->args->args[i], {max_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
initial_backoff_ms = grpc_channel_arg_get_integer(
&c->args->args[i], {initial_backoff_ms, 100, INT_MAX});
}
}
}
grpc_backoff_init(
&c->backoff_state, initial_backoff_ms,
fixed_reconnect_backoff ? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
min_backoff_ms, max_backoff_ms);
grpc_core::BackOff::Options backoff_options;
parse_args_for_backoff_values(args->args, &backoff_options,
&c->min_connect_timeout_ms);
c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(key, c);
@ -368,11 +387,11 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
args.deadline = c->backoff_result.current_deadline;
const grpc_millis min_deadline =
c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
@ -416,7 +435,7 @@ static void on_alarm(void* arg, grpc_error* error) {
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->backoff_result = grpc_backoff_step(&c->backoff_state);
c->next_attempt_deadline = c->backoff->Step();
continue_connect_locked(c);
gpr_mu_unlock(&c->mu);
} else {
@ -452,22 +471,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
if (!c->backoff_begun) {
c->backoff_begun = true;
c->backoff_result = grpc_backoff_begin(&c->backoff_state);
c->next_attempt_deadline = c->backoff->Begin();
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
c->have_alarm = true;
const grpc_millis time_til_next =
c->backoff_result.next_attempt_start_time -
grpc_core::ExecCtx::Get()->Now();
c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) {
gpr_log(GPR_INFO, "Retry immediately");
} else {
gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time,
&c->on_alarm);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
}
}

@ -18,61 +18,53 @@
#include "src/core/lib/backoff/backoff.h"
#include <algorithm>
#include <grpc/support/useful.h>
void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
double multiplier, double jitter,
grpc_millis min_connect_timeout,
grpc_millis max_backoff) {
backoff->initial_backoff = initial_backoff;
backoff->multiplier = multiplier;
backoff->jitter = jitter;
backoff->min_connect_timeout = min_connect_timeout;
backoff->max_backoff = max_backoff;
backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec;
}
namespace grpc_core {
grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) {
backoff->current_backoff = backoff->initial_backoff;
const grpc_millis initial_timeout =
GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout);
const grpc_millis now = grpc_core::ExecCtx::Get()->Now();
const grpc_backoff_result result = {now + initial_timeout,
now + backoff->current_backoff};
return result;
}
namespace {
/* Generate a random number between 0 and 1. */
static double generate_uniform_random_number(uint32_t* rng_state) {
*rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31);
return *rng_state / (double)((uint32_t)1 << 31);
/* Generate a random number between 0 and 1. We roll our own RNG because seeding
* rand() modifies a global variable we have no control over. */
double generate_uniform_random_number(uint32_t* rng_state) {
constexpr uint32_t two_raise_31 = uint32_t(1) << 31;
*rng_state = (1103515245 * *rng_state + 12345) % two_raise_31;
return *rng_state / static_cast<double>(two_raise_31);
}
static double generate_uniform_random_number_between(uint32_t* rng_state,
double a, double b) {
double generate_uniform_random_number_between(uint32_t* rng_state, double a,
double b) {
if (a == b) return a;
if (a > b) GPR_SWAP(double, a, b); // make sure a < b
const double range = b - a;
return a + generate_uniform_random_number(rng_state) * range;
}
} // namespace
grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) {
backoff->current_backoff = (grpc_millis)(GPR_MIN(
backoff->current_backoff * backoff->multiplier, backoff->max_backoff));
const double jitter = generate_uniform_random_number_between(
&backoff->rng_state, -backoff->jitter * backoff->current_backoff,
backoff->jitter * backoff->current_backoff);
const grpc_millis current_timeout =
GPR_MAX((grpc_millis)(backoff->current_backoff + jitter),
backoff->min_connect_timeout);
const grpc_millis next_timeout = GPR_MIN(
(grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff);
const grpc_millis now = grpc_core::ExecCtx::Get()->Now();
const grpc_backoff_result result = {now + current_timeout,
now + next_timeout};
return result;
BackOff::BackOff(const Options& options) : options_(options) {
rng_state_ = static_cast<uint32_t>(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
}
grpc_millis BackOff::Begin() {
current_backoff_ = options_.initial_backoff();
return current_backoff_ + grpc_core::ExecCtx::Get()->Now();
}
void grpc_backoff_reset(grpc_backoff* backoff) {
backoff->current_backoff = backoff->initial_backoff;
grpc_millis BackOff::Step() {
current_backoff_ =
(grpc_millis)(std::min(current_backoff_ * options_.multiplier(),
(double)options_.max_backoff()));
const double jitter = generate_uniform_random_number_between(
&rng_state_, -options_.jitter() * current_backoff_,
options_.jitter() * current_backoff_);
const grpc_millis next_timeout = (grpc_millis)(current_backoff_ + jitter);
return next_timeout + grpc_core::ExecCtx::Get()->Now();
}
void BackOff::Reset() { current_backoff_ = options_.initial_backoff(); }
void BackOff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; }
} // namespace grpc_core

@ -21,53 +21,69 @@
#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
/// const: how long to wait after the first failure before retrying
grpc_millis initial_backoff;
/// const: factor with which to multiply backoff after a failed retry
double multiplier;
/// const: amount to randomize backoffs
double jitter;
/// const: minimum time between retries
grpc_millis min_connect_timeout;
/// const: maximum time between retries
grpc_millis max_backoff;
namespace grpc_core {
/// Implementation of the backoff mechanism described in
/// doc/connection-backoff.md
class BackOff {
public:
class Options;
/// Initialize backoff machinery - does not need to be destroyed
explicit BackOff(const Options& options);
/// Begin retry loop: returns the deadline to be used for the next attempt,
/// following the backoff strategy.
grpc_millis Begin();
/// Step a retry loop: returns the deadline to be used for the next attempt,
/// following the backoff strategy.
grpc_millis Step();
/// Reset the backoff, so the next grpc_backoff_step will be a
/// grpc_backoff_begin.
void Reset();
void SetRandomSeed(unsigned int seed);
class Options {
public:
Options& set_initial_backoff(grpc_millis initial_backoff) {
initial_backoff_ = initial_backoff;
return *this;
}
Options& set_multiplier(double multiplier) {
multiplier_ = multiplier;
return *this;
}
Options& set_jitter(double jitter) {
jitter_ = jitter;
return *this;
}
Options& set_max_backoff(grpc_millis max_backoff) {
max_backoff_ = max_backoff;
return *this;
}
/// how long to wait after the first failure before retrying
grpc_millis initial_backoff() const { return initial_backoff_; }
/// factor with which to multiply backoff after a failed retry
double multiplier() const { return multiplier_; }
/// amount to randomize backoffs
double jitter() const { return jitter_; }
/// maximum time between retries
grpc_millis max_backoff() const { return max_backoff_; }
private:
grpc_millis initial_backoff_;
double multiplier_;
double jitter_;
grpc_millis max_backoff_;
}; // class Options
private:
const Options options_;
/// current delay before retries
grpc_millis current_backoff;
/// random number generator
uint32_t rng_state;
} grpc_backoff;
typedef struct {
/// Deadline to be used for the current attempt.
grpc_millis current_deadline;
/// Deadline to be used for the next attempt, following the backoff strategy.
grpc_millis next_attempt_start_time;
} grpc_backoff_result;
/// Initialize backoff machinery - does not need to be destroyed
void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
double multiplier, double jitter,
grpc_millis min_connect_timeout,
grpc_millis max_backoff);
/// Begin retry loop: returns the deadlines to be used for the current attempt
/// and the subsequent retry, if any.
grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff);
/// Step a retry loop: returns the deadlines to be used for the current attempt
/// and the subsequent retry, if any.
grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff);
/// Reset the backoff, so the next grpc_backoff_step will be a
/// grpc_backoff_begin.
void grpc_backoff_reset(grpc_backoff* backoff);
grpc_millis current_backoff_;
uint32_t rng_state_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */

@ -212,6 +212,9 @@ finish:
fd = nullptr;
}
done = (--ac->refs == 0);
// Create a copy of the data from "ac" to be accessed after the unlock, as
// "ac" and its contents may be deallocated by the time they are read.
const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
char* error_descr;
@ -225,9 +228,12 @@ finish:
gpr_free(error_descr);
gpr_free(desc);
error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(ac->addr_str));
addr_str_slice);
grpc_slice_unref(addr_str_slice);
}
if (done) {
// This is safe even outside the lock, because "done", the sentinel, is
// populated *inside* the lock.
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
grpc_channel_args_destroy(ac->channel_args);

@ -26,11 +26,14 @@ package(
grpc_cc_test(
name = "backoff_test",
srcs = ["backoff_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:grpc",
"//test/core/util:grpc_test_util",
"//:gpr",
"//:grpc",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
)

@ -18,137 +18,131 @@
#include "src/core/lib/backoff/backoff.h"
#include <grpc/grpc.h>
#include <algorithm>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include <gtest/gtest.h>
#include "test/core/util/test_config.h"
static void test_constant_backoff(void) {
grpc_backoff backoff;
namespace grpc {
namespace testing {
namespace {
using grpc_core::BackOff;
TEST(BackOffTest, ConstantBackOff) {
const grpc_millis initial_backoff = 200;
const double multiplier = 1.0;
const double jitter = 0.0;
const grpc_millis min_connect_timeout = 100;
const grpc_millis max_backoff = 1000;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
grpc_core::ExecCtx exec_ctx;
grpc_backoff_result next_deadlines = grpc_backoff_begin(&backoff);
GPR_ASSERT(next_deadlines.current_deadline -
grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
GPR_ASSERT(next_deadlines.next_attempt_start_time -
grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
BackOff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_max_backoff(max_backoff);
BackOff backoff(options);
grpc_millis next_attempt_start_time = backoff.Begin();
EXPECT_EQ(next_attempt_start_time - grpc_core::ExecCtx::Get()->Now(),
initial_backoff);
for (int i = 0; i < 10000; i++) {
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline -
grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
GPR_ASSERT(next_deadlines.next_attempt_start_time -
grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_attempt_start_time = backoff.Step();
EXPECT_EQ(next_attempt_start_time - grpc_core::ExecCtx::Get()->Now(),
initial_backoff);
}
}
static void test_min_connect(void) {
grpc_backoff backoff;
TEST(BackOffTest, MinConnect) {
const grpc_millis initial_backoff = 100;
const double multiplier = 1.0;
const double jitter = 0.0;
const grpc_millis min_connect_timeout = 200;
const grpc_millis max_backoff = 1000;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
grpc_core::ExecCtx exec_ctx;
grpc_backoff_result next = grpc_backoff_begin(&backoff);
// Because the min_connect_timeout > initial_backoff, current_deadline is used
// as the deadline for the current attempt.
GPR_ASSERT(next.current_deadline - grpc_core::ExecCtx::Get()->Now() ==
min_connect_timeout);
// ... while, if the current attempt fails, the next one will happen after
// initial_backoff.
GPR_ASSERT(next.next_attempt_start_time - grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
BackOff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_max_backoff(max_backoff);
BackOff backoff(options);
grpc_millis next = backoff.Begin();
EXPECT_EQ(next - grpc_core::ExecCtx::Get()->Now(), initial_backoff);
}
static void test_no_jitter_backoff(void) {
grpc_backoff backoff;
TEST(BackOffTest, NoJitterBackOff) {
const grpc_millis initial_backoff = 2;
const double multiplier = 2.0;
const double jitter = 0.0;
const grpc_millis min_connect_timeout = 1;
const grpc_millis max_backoff = 513;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
BackOff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_max_backoff(max_backoff);
BackOff backoff(options);
// x_1 = 2
// x_n = 2**i + x_{i-1} ( = 2**(n+1) - 2 )
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Get()->TestOnlySetNow(0);
grpc_backoff_result next_deadlines = grpc_backoff_begin(&backoff);
GPR_ASSERT(next_deadlines.current_deadline ==
next_deadlines.next_attempt_start_time);
GPR_ASSERT(next_deadlines.current_deadline == 2);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 6);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 14);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 30);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 62);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 126);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 254);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 510);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 1022);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
grpc_millis next = backoff.Begin();
EXPECT_EQ(next, 2);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 6);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 14);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 30);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 62);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 126);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 254);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 510);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 1022);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
// Hit the maximum timeout. From this point onwards, retries will increase
// only by max timeout.
GPR_ASSERT(next_deadlines.current_deadline == 1535);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 2048);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
next_deadlines = grpc_backoff_step(&backoff);
GPR_ASSERT(next_deadlines.current_deadline == 2561);
EXPECT_EQ(next, 1535);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 2048);
grpc_core::ExecCtx::Get()->TestOnlySetNow(next);
next = backoff.Step();
EXPECT_EQ(next, 2561);
}
static void test_jitter_backoff(void) {
TEST(BackOffTest, JitterBackOff) {
const grpc_millis initial_backoff = 500;
grpc_millis current_backoff = initial_backoff;
const grpc_millis max_backoff = 1000;
const grpc_millis min_connect_timeout = 100;
const double multiplier = 1.0;
const double jitter = 0.1;
grpc_backoff backoff;
grpc_backoff_init(&backoff, initial_backoff, multiplier, jitter,
min_connect_timeout, max_backoff);
BackOff::Options options;
options.set_initial_backoff(initial_backoff)
.set_multiplier(multiplier)
.set_jitter(jitter)
.set_max_backoff(max_backoff);
BackOff backoff(options);
backoff.rng_state = 0; // force consistent PRNG
backoff.SetRandomSeed(0); // force consistent PRNG
grpc_core::ExecCtx exec_ctx;
grpc_backoff_result next_deadlines = grpc_backoff_begin(&backoff);
GPR_ASSERT(next_deadlines.current_deadline -
grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
GPR_ASSERT(next_deadlines.next_attempt_start_time -
grpc_core::ExecCtx::Get()->Now() ==
initial_backoff);
grpc_millis next = backoff.Begin();
EXPECT_EQ(next - grpc_core::ExecCtx::Get()->Now(), initial_backoff);
grpc_millis expected_next_lower_bound =
(grpc_millis)((double)current_backoff * (1 - jitter));
@ -156,33 +150,27 @@ static void test_jitter_backoff(void) {
(grpc_millis)((double)current_backoff * (1 + jitter));
for (int i = 0; i < 10000; i++) {
next_deadlines = grpc_backoff_step(&backoff);
next = backoff.Step();
// next-now must be within (jitter*100)% of the current backoff (which
// increases by * multiplier up to max_backoff).
const grpc_millis timeout_millis =
next_deadlines.current_deadline - grpc_core::ExecCtx::Get()->Now();
GPR_ASSERT(timeout_millis >= expected_next_lower_bound);
GPR_ASSERT(timeout_millis <= expected_next_upper_bound);
current_backoff = GPR_MIN(
const grpc_millis timeout_millis = next - grpc_core::ExecCtx::Get()->Now();
EXPECT_GE(timeout_millis, expected_next_lower_bound);
EXPECT_LE(timeout_millis, expected_next_upper_bound);
current_backoff = std::min(
(grpc_millis)((double)current_backoff * multiplier), max_backoff);
expected_next_lower_bound =
(grpc_millis)((double)current_backoff * (1 - jitter));
expected_next_upper_bound =
(grpc_millis)((double)current_backoff * (1 + jitter));
grpc_core::ExecCtx::Get()->TestOnlySetNow(next_deadlines.current_deadline);
}
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
gpr_time_init();
test_constant_backoff();
test_min_connect();
test_no_jitter_backoff();
test_jitter_backoff();
grpc_shutdown();
return 0;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -28,6 +28,7 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
@ -35,6 +36,7 @@
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -48,10 +50,33 @@ using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using std::chrono::system_clock;
// defined in tcp_client_posix.c
extern void (*grpc_tcp_client_connect_impl)(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_millis deadline);
const auto original_tcp_connect_fn = grpc_tcp_client_connect_impl;
namespace grpc {
namespace testing {
namespace {
gpr_atm g_connection_delay_ms;
void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) {
const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms);
if (delay_ms > 0) {
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
}
original_tcp_connect_fn(closure, ep, interested_parties, channel_args, addr,
deadline + delay_ms);
}
// Subclass of TestServiceImpl that increments a request counter for
// every call to the Echo RPC.
class MyTestServiceImpl : public TestServiceImpl {
@ -136,22 +161,22 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_lb_addresses_destroy(addresses);
}
void ResetStub(const grpc::string& lb_policy_name = "") {
ChannelArguments args;
std::vector<int> GetServersPorts() {
std::vector<int> ports;
for (const auto& server : servers_) ports.push_back(server->port_);
return ports;
}
void ResetStub(const std::vector<int>& ports,
const grpc::string& lb_policy_name,
ChannelArguments args = ChannelArguments()) {
if (lb_policy_name.size() > 0) {
args.SetLoadBalancingPolicyName(lb_policy_name);
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000);
std::ostringstream uri;
uri << "fake:///";
for (size_t i = 0; i < servers_.size() - 1; ++i) {
uri << "127.0.0.1:" << servers_[i]->port_ << ",";
}
uri << "127.0.0.1:" << servers_[servers_.size() - 1]->port_;
channel_ =
CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
CreateCustomChannel("fake:///", InsecureChannelCredentials(), args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
@ -266,7 +291,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub(); // implicit pick first
ResetStub(GetServersPorts(), ""); // test that pick first is the default.
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@ -290,11 +315,63 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 100;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
ResetStub(ports, "pick_first", args);
SetNextResolution(ports);
// The channel won't become connected (there's no server).
ASSERT_FALSE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
// Bring up a server on the chosen port.
StartServers(1, ports);
// Now it will.
ASSERT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
gpr_log(GPR_DEBUG, "Waited %ld milliseconds", waited_ms);
// We should have waited at least kInitialBackOffMs. We substract one to
// account for test and precision accuracy drift.
EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
// But not much more.
EXPECT_GT(
gpr_time_cmp(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
0);
}
TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
ChannelArguments args;
constexpr int kMinReconnectBackOffMs = 1000;
args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
ResetStub(ports, "pick_first", args);
SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
grpc_tcp_client_connect_impl = tcp_client_connect_with_delay;
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
gpr_log(GPR_DEBUG, "Waited %ld ms", waited_ms);
// We should have waited at least kMinReconnectBackOffMs. We substract one to
// account for test and precision accuracy drift.
EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
gpr_atm_rel_store(&g_connection_delay_ms, 0);
}
TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub(); // implicit pick first
ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports;
// Perform one RPC against the first server.
@ -340,7 +417,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub(); // implicit pick first
ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports;
// Perform one RPC against the first server.
@ -370,7 +447,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub(); // implicit pick first
ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@ -392,7 +469,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports;
for (const auto& server : servers_) {
ports.emplace_back(server->port_);
@ -423,7 +500,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports;
// Start with a single server.
@ -506,7 +583,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports;
// Start with a single server.
@ -538,7 +615,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@ -565,7 +642,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
ports.push_back(grpc_pick_unused_port_or_die());
}
StartServers(kNumServers, ports);
ResetStub("round_robin");
ResetStub(GetServersPorts(), "round_robin");
SetNextResolution(ports);
// Send a number of RPCs, which succeed.
for (size_t i = 0; i < 100; ++i) {

@ -99,23 +99,6 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "backoff_test",
"src": [
"test/core/backoff/backoff_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -2497,6 +2480,23 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "backoff_test",
"src": [
"test/core/backoff/backoff_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

@ -121,30 +121,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "backoff_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -2959,6 +2935,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c++",
"name": "backoff_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save