Added tests for initial and min connect backoffs

reviewable/pr13494/r13
David Garcia Quintas 7 years ago
parent a5e2da4096
commit af42ea715c
  1. 126
      test/cpp/end2end/client_lb_end2end_test.cc

@ -35,6 +35,7 @@
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/support/env.h" #include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -48,10 +49,34 @@ using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse; using grpc::testing::EchoResponse;
using std::chrono::system_clock; using std::chrono::system_clock;
// defined in tcp_client_posix.c
extern void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_millis deadline);
const auto original_tcp_connect_fn = grpc_tcp_client_connect_impl;
namespace grpc { namespace grpc {
namespace testing { namespace testing {
namespace { namespace {
int g_connection_delay_ms;
void tcp_client_connect_with_delay(grpc_exec_ctx* exec_ctx,
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) {
if (g_connection_delay_ms > 0) {
gpr_sleep_until(
grpc_timeout_milliseconds_to_deadline(g_connection_delay_ms));
}
original_tcp_connect_fn(exec_ctx, closure, ep, interested_parties,
channel_args, addr, deadline);
}
// Subclass of TestServiceImpl that increments a request counter for // Subclass of TestServiceImpl that increments a request counter for
// every call to the Echo RPC. // every call to the Echo RPC.
class MyTestServiceImpl : public TestServiceImpl { class MyTestServiceImpl : public TestServiceImpl {
@ -137,20 +162,29 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
void ResetStub(const grpc::string& lb_policy_name = "") { std::vector<int> GetServersPorts() {
ChannelArguments args; std::vector<int> ports;
std::transform(servers_.begin(), servers_.end(), std::back_inserter(ports),
[](const std::unique_ptr<ServerData>& server) {
return server->port_;
});
return ports;
}
void ResetStub(const std::vector<int>& ports,
const grpc::string& lb_policy_name,
ChannelArguments args = ChannelArguments()) {
if (lb_policy_name.size() > 0) { if (lb_policy_name.size() > 0) {
args.SetLoadBalancingPolicyName(lb_policy_name); args.SetLoadBalancingPolicyName(lb_policy_name);
} // else, default to pick first } // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_); response_generator_);
args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000);
std::ostringstream uri; std::ostringstream uri;
uri << "fake:///"; uri << "fake:///";
for (size_t i = 0; i < servers_.size() - 1; ++i) { for (size_t i = 0; i < ports.size() - 1; ++i) {
uri << "127.0.0.1:" << servers_[i]->port_ << ","; uri << "127.0.0.1:" << ports[i] << ",";
} }
uri << "127.0.0.1:" << servers_[servers_.size() - 1]->port_; uri << "127.0.0.1:" << ports.back();
channel_ = channel_ =
CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_); stub_ = grpc::testing::EchoTestService::NewStub(channel_);
@ -267,7 +301,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub(); // implicit pick first ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports; std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_); ports.emplace_back(servers_[i]->port_);
@ -291,11 +325,73 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 100;
args.SetInt("grpc.initial_reconnect_backoff_ms", kInitialBackOffMs);
// Start a server just to capture an available port number.
const int kNumServers = 1;
StartServers(kNumServers);
const auto ports = GetServersPorts();
// And immediate kill it so that requests would fail to initially connect.
servers_[0]->Shutdown(false);
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
ResetStub(ports, "pick_first", args);
SetNextResolution(ports);
// Client request should fail.
CheckRpcSendFailure();
// Bring servers back up on the same port (we aren't recreating the channel).
StartServers(kNumServers, ports);
// We simply send an RPC without paying attention to the result, even though
// in the vast majority of cases, the request would succeed. However, under
// high load, it may not. Waiting for the server here would however distort
// the backoff measurements.
SendRpc();
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
gpr_log(GPR_DEBUG, "Waited %ld milliseconds", waited_ms);
// We should have waited at least kInitialBackOffMs. We substract one because
// gRPC works with millisecond accuracy.
EXPECT_GE(waited_ms, kInitialBackOffMs - 1);
// But not much more.
EXPECT_GT(
gpr_time_cmp(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
0);
}
TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
ChannelArguments args;
constexpr int kMinReconnectBackOffMs = 1000;
args.SetInt("grpc.min_reconnect_backoff_ms", kMinReconnectBackOffMs);
// Start a server just to capture an available port number.
const int kNumServers = 1;
StartServers(kNumServers);
const auto ports = GetServersPorts();
ResetStub(ports, "pick_first", args);
SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
g_connection_delay_ms = kMinReconnectBackOffMs * 1.10;
grpc_tcp_client_connect_impl = tcp_client_connect_with_delay;
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
// We simply send an RPC without paying attention to the result: we only care
// about how long the subchannel waited for the connection.
SendRpc();
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
gpr_log(GPR_DEBUG, "Waited %ld ms", waited_ms);
// We should have waited at least kMinReconnectBackOffMs. We substract one
// because gRPC works with millisecond accuracy.
EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1);
grpc_tcp_client_connect_impl = original_tcp_connect_fn;
}
TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub(); // implicit pick first ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports; std::vector<int> ports;
// Perform one RPC against the first server. // Perform one RPC against the first server.
@ -341,7 +437,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub(); // implicit pick first ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports; std::vector<int> ports;
// Perform one RPC against the first server. // Perform one RPC against the first server.
@ -371,7 +467,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub(); // implicit pick first ResetStub(GetServersPorts(), "pick_first");
std::vector<int> ports; std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_); ports.emplace_back(servers_[i]->port_);
@ -393,7 +489,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub("round_robin"); ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports; std::vector<int> ports;
for (const auto& server : servers_) { for (const auto& server : servers_) {
ports.emplace_back(server->port_); ports.emplace_back(server->port_);
@ -424,7 +520,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub("round_robin"); ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports; std::vector<int> ports;
// Start with a single server. // Start with a single server.
@ -507,7 +603,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub("round_robin"); ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports; std::vector<int> ports;
// Start with a single server. // Start with a single server.
@ -539,7 +635,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub("round_robin"); ResetStub(GetServersPorts(), "round_robin");
std::vector<int> ports; std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_); ports.emplace_back(servers_[i]->port_);
@ -566,7 +662,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
ports.push_back(grpc_pick_unused_port_or_die()); ports.push_back(grpc_pick_unused_port_or_die());
} }
StartServers(kNumServers, ports); StartServers(kNumServers, ports);
ResetStub("round_robin"); ResetStub(GetServersPorts(), "round_robin");
SetNextResolution(ports); SetNextResolution(ports);
// Send a number of RPCs, which succeed. // Send a number of RPCs, which succeed.
for (size_t i = 0; i < 100; ++i) { for (size_t i = 0; i < 100; ++i) {

Loading…
Cancel
Save