Merge pull request #6033 from ctiller/channel_max_reconnect_backoff

Channel max reconnect backoff
pull/6029/head
Nicolas Noble 9 years ago
commit e3e91a3d3f
  1. 2
      include/grpc/impl/codegen/grpc_types.h
  2. 19
      src/core/lib/client_config/subchannel.c
  3. 8
      src/proto/grpc/testing/messages.proto
  4. 4
      src/proto/grpc/testing/test.proto
  5. 9
      test/core/util/reconnect_server.c
  6. 3
      test/core/util/reconnect_server.h
  7. 26
      test/cpp/interop/reconnect_interop_client.cc
  8. 10
      test/cpp/interop/reconnect_interop_server.cc
  9. 15
      test/cpp/util/create_test_channel.cc
  10. 8
      test/cpp/util/create_test_channel.h

@ -142,6 +142,8 @@ typedef struct {
/** Secondary user agent: goes at the end of the user-agent metadata
sent on each request */
#define GRPC_ARG_SECONDARY_USER_AGENT_STRING "grpc.secondary_user_agent"
/** The maximum time between subsequent connection attempts, in ms */
#define GRPC_ARG_MAX_RECONNECT_BACKOFF_MS "grpc.max_reconnect_backoff_ms"
/* The caller of the secure_channel_create functions may override the target
name used for SSL host name checking using this channel argument which is of
type GRPC_ARG_STRING. This *should* be used for testing only.

@ -352,6 +352,25 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->args->args[i].value.integer,
c->args->args[i].value.integer);
}
if (0 ==
strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
if (c->args->args[i].type == GRPC_ARG_INTEGER) {
if (c->args->args[i].value.integer >= 0) {
gpr_backoff_init(
&c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
GRPC_SUBCHANNEL_RECONNECT_JITTER,
GPR_MIN(c->args->args[i].value.integer,
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
c->args->args[i].value.integer);
} else {
gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
" : must be non-negative");
}
} else {
gpr_log(GPR_ERROR,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
}
}
}
}
gpr_mu_init(&c->mu);

@ -1,5 +1,5 @@
// Copyright 2015, Google Inc.
// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@ -158,6 +158,12 @@ message StreamingOutputCallResponse {
Payload payload = 1;
}
// For reconnect interop test only.
// Client tells server what reconnection parameters it used.
message ReconnectParams {
int32 max_reconnect_backoff_ms = 1;
}
// For reconnect interop test only.
// Server tells client whether its reconnects are following the spec and the
// reconnect backoffs it saw.

@ -1,5 +1,5 @@
// Copyright 2015, Google Inc.
// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@ -80,6 +80,6 @@ service UnimplementedService {
// A service used to control reconnect server.
service ReconnectService {
rpc Start(grpc.testing.Empty) returns (grpc.testing.Empty);
rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty);
rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo);
}

@ -60,8 +60,12 @@ static void pretty_print_backoffs(reconnect_server *server) {
i, backoff / 1000.0, expected_backoff / 1000.0,
(backoff - expected_backoff) * 100.0 / expected_backoff);
expected_backoff *= 1.6;
if (expected_backoff > 120 * 1000) {
expected_backoff = 120 * 1000;
int max_reconnect_backoff_ms = 120 * 1000;
if (server->max_reconnect_backoff_ms > 0) {
max_reconnect_backoff_ms = server->max_reconnect_backoff_ms;
}
if (expected_backoff > max_reconnect_backoff_ms) {
expected_backoff = max_reconnect_backoff_ms;
}
}
}
@ -108,6 +112,7 @@ void reconnect_server_init(reconnect_server *server) {
server->head = NULL;
server->tail = NULL;
server->peer = NULL;
server->max_reconnect_backoff_ms = 0;
}
void reconnect_server_start(reconnect_server *server, int port) {

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -52,6 +52,7 @@ typedef struct reconnect_server {
timestamp_list *head;
timestamp_list *tail;
char *peer;
int max_reconnect_backoff_ms;
} reconnect_server;
void reconnect_server_init(reconnect_server *server);

@ -37,6 +37,7 @@
#include <gflags/gflags.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/empty.grpc.pb.h"
@ -48,13 +49,18 @@
DEFINE_int32(server_control_port, 0, "Server port for control rpcs.");
DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection.");
DEFINE_string(server_host, "127.0.0.1", "Server host to connect to");
DEFINE_int32(max_reconnect_backoff_ms, 0,
"Maximum backoff time, or 0 for default.");
using grpc::CallCredentials;
using grpc::Channel;
using grpc::ChannelArguments;
using grpc::ClientContext;
using grpc::CreateTestChannel;
using grpc::Status;
using grpc::testing::Empty;
using grpc::testing::ReconnectInfo;
using grpc::testing::ReconnectParams;
using grpc::testing::ReconnectService;
int main(int argc, char** argv) {
@ -68,17 +74,25 @@ int main(int argc, char** argv) {
ReconnectService::NewStub(
CreateTestChannel(server_address.str(), false)));
ClientContext start_context;
Empty empty_request;
ReconnectParams reconnect_params;
reconnect_params.set_max_reconnect_backoff_ms(FLAGS_max_reconnect_backoff_ms);
Empty empty_response;
Status start_status =
control_stub->Start(&start_context, empty_request, &empty_response);
control_stub->Start(&start_context, reconnect_params, &empty_response);
GPR_ASSERT(start_status.ok());
gpr_log(GPR_INFO, "Starting connections with retries.");
server_address.str("");
server_address << FLAGS_server_host << ':' << FLAGS_server_retry_port;
ChannelArguments channel_args;
if (FLAGS_max_reconnect_backoff_ms > 0) {
channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
FLAGS_max_reconnect_backoff_ms);
}
std::shared_ptr<Channel> retry_channel =
CreateTestChannel(server_address.str(), true);
CreateTestChannel(server_address.str(), "foo.test.google.fr", true, false,
std::shared_ptr<CallCredentials>(), channel_args);
// About 13 retries.
const int kDeadlineSeconds = 540;
// Use any rpc to test retry.
@ -88,15 +102,15 @@ int main(int argc, char** argv) {
retry_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kDeadlineSeconds));
Status retry_status =
retry_stub->Start(&retry_context, empty_request, &empty_response);
retry_stub->Start(&retry_context, reconnect_params, &empty_response);
GPR_ASSERT(retry_status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED);
gpr_log(GPR_INFO, "Done retrying, getting final data from server");
ClientContext stop_context;
ReconnectInfo response;
Status stop_status =
control_stub->Stop(&stop_context, empty_request, &response);
Status stop_status = control_stub->Stop(&stop_context, Empty(), &response);
GPR_ASSERT(stop_status.ok());
GPR_ASSERT(response.passed() == true);
gpr_log(GPR_INFO, "Passed");
return 0;
}

@ -71,6 +71,7 @@ using grpc::Status;
using grpc::testing::Empty;
using grpc::testing::ReconnectService;
using grpc::testing::ReconnectInfo;
using grpc::testing::ReconnectParams;
static bool got_sigint = false;
@ -92,7 +93,8 @@ class ReconnectServiceImpl : public ReconnectService::Service {
void Poll(int seconds) { reconnect_server_poll(&tcp_server_, seconds); }
Status Start(ServerContext* context, const Empty* request, Empty* response) {
Status Start(ServerContext* context, const ReconnectParams* request,
Empty* response) {
bool start_server = true;
std::unique_lock<std::mutex> lock(mu_);
while (serving_ && !shutdown_) {
@ -105,6 +107,8 @@ class ReconnectServiceImpl : public ReconnectService::Service {
if (server_started_) {
start_server = false;
} else {
tcp_server_.max_reconnect_backoff_ms =
request->max_reconnect_backoff_ms();
server_started_ = true;
}
lock.unlock();
@ -133,7 +137,9 @@ class ReconnectServiceImpl : public ReconnectService::Service {
const double kTransmissionDelay = 100.0;
const double kBackoffMultiplier = 1.6;
const double kJitterFactor = 0.2;
const int kMaxBackoffMs = 120 * 1000;
const int kMaxBackoffMs = tcp_server_.max_reconnect_backoff_ms
? tcp_server_.max_reconnect_backoff_ms
: 120 * 1000;
bool passed = true;
for (timestamp_list* cur = tcp_server_.head; cur && cur->next;
cur = cur->next) {

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -58,8 +58,9 @@ namespace grpc {
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds) {
ChannelArguments channel_args;
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args) {
ChannelArguments channel_args(args);
if (enable_ssl) {
const char* roots_certs = use_prod_roots ? "" : test_root_cert;
SslCredentialsOptions ssl_opts = {roots_certs, "", ""};
@ -81,6 +82,14 @@ std::shared_ptr<Channel> CreateTestChannel(
}
}
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds) {
return CreateTestChannel(server, override_hostname, enable_ssl,
use_prod_roots, creds, ChannelArguments());
}
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots) {

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -53,6 +53,12 @@ std::shared_ptr<Channel> CreateTestChannel(
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds);
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args);
} // namespace grpc
#endif // GRPC_TEST_CPP_UTIL_CREATE_TEST_CHANNEL_H

Loading…
Cancel
Save