client_lb_end2end_test: explicitly check failed RPC statuses (#29906)

* client_lb_end2end_test: explicitly check failed RPC statuses

* appease clang-tidy

* fix memory leak
pull/29436/head
Mark D. Roth 3 years ago committed by GitHub
parent 954f141c6c
commit 35b7d88654
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  2. 288
      test/cpp/end2end/client_lb_end2end_test.cc

@ -151,14 +151,14 @@ void FakeResolver::MaybeSendResultLocked() {
result_handler_->ReportResult(std::move(result));
return_failure_ = false;
} else if (has_next_result_) {
Result result;
result.addresses = std::move(next_result_.addresses);
result.service_config = std::move(next_result_.service_config);
// When both next_results_ and channel_args_ contain an arg with the same
// name, only the one in next_results_ will be kept since next_results_ is
// before channel_args_.
result.args = grpc_channel_args_union(next_result_.args, channel_args_);
result_handler_->ReportResult(std::move(result));
grpc_channel_args* new_args =
grpc_channel_args_union(next_result_.args, channel_args_);
grpc_channel_args_destroy(next_result_.args);
next_result_.args = new_args;
result_handler_->ReportResult(std::move(next_result_));
has_next_result_ = false;
}
}

@ -28,6 +28,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -201,6 +202,9 @@ class FakeResolverResponseGeneratorWrapper {
result.addresses->emplace_back(address.addr, address.len,
nullptr /* args */, std::move(attributes));
}
if (result.addresses->empty()) {
result.resolution_note = "fake resolver empty address list";
}
if (service_config_json != nullptr) {
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config = grpc_core::ServiceConfigImpl::Create(
@ -301,11 +305,10 @@ class ClientLbEnd2endTest : public ::testing::Test {
return grpc::CreateCustomChannel("fake:///", creds_, args);
}
bool SendRpc(
Status SendRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
EchoResponse* response = nullptr, int timeout_ms = 1000,
Status* result = nullptr, bool wait_for_ready = false,
EchoRequest* request = nullptr) {
bool wait_for_ready = false, EchoRequest* request = nullptr) {
EchoResponse local_response;
if (response == nullptr) response = &local_response;
EchoRequest local_request;
@ -318,18 +321,16 @@ class ClientLbEnd2endTest : public ::testing::Test {
context.AddMetadata("foo", "1");
context.AddMetadata("bar", "2");
context.AddMetadata("baz", "3");
Status status = stub->Echo(&context, *request, response);
if (result != nullptr) *result = status;
return status.ok();
return stub->Echo(&context, *request, response);
}
void CheckRpcSendOk(
const grpc_core::DebugLocation& location,
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
const grpc_core::DebugLocation& location, bool wait_for_ready = false,
bool wait_for_ready = false,
xds::data::orca::v3::OrcaLoadReport* load_report = nullptr,
int timeout_ms = 2000) {
EchoResponse response;
Status status;
EchoRequest request;
EchoRequest* request_ptr = nullptr;
if (load_report != nullptr) {
@ -338,20 +339,26 @@ class ClientLbEnd2endTest : public ::testing::Test {
auto backend_metrics = params->mutable_backend_metrics();
*backend_metrics = *load_report;
}
const bool success = SendRpc(stub, &response, timeout_ms, &status,
wait_for_ready, request_ptr);
ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
<< "\nError: " << status.error_message() << " "
<< status.error_details();
Status status =
SendRpc(stub, &response, timeout_ms, wait_for_ready, request_ptr);
ASSERT_TRUE(status.ok())
<< "From " << location.file() << ":" << location.line()
<< "\nError: " << status.error_message() << " "
<< status.error_details();
ASSERT_EQ(response.message(), kRequestMessage)
<< "From " << location.file() << ":" << location.line();
if (!success) abort();
}
void CheckRpcSendFailure(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
const bool success = SendRpc(stub);
EXPECT_FALSE(success);
const grpc_core::DebugLocation& location,
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
StatusCode expected_status, absl::string_view expected_message) {
Status status = SendRpc(stub);
EXPECT_FALSE(status.ok());
EXPECT_EQ(expected_status, status.error_code())
<< location.file() << ":" << location.line();
EXPECT_EQ(expected_message, status.error_message())
<< location.file() << ":" << location.line();
}
struct ServerData {
@ -416,28 +423,37 @@ class ClientLbEnd2endTest : public ::testing::Test {
for (const auto& server : servers_) server->service_.ResetCounters();
}
bool SeenAllServers(size_t start_index, size_t stop_index) {
bool SeenAllServers(size_t start_index = 0, size_t stop_index = 0) {
if (stop_index == 0) stop_index = servers_.size();
for (size_t i = start_index; i < stop_index; ++i) {
if (servers_[i]->service_.request_count() == 0) return false;
}
return true;
}
// If status_check is null, all RPCs must succeed.
// If status_check is non-null, it will be called for all non-OK RPCs.
void WaitForServers(
const grpc_core::DebugLocation& location,
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t start_index, size_t stop_index,
const grpc_core::DebugLocation& location, bool ignore_failure = false,
size_t start_index = 0, size_t stop_index = 0,
std::function<void(const Status&)> status_check = nullptr,
absl::Duration timeout = absl::Seconds(30)) {
if (stop_index == 0) stop_index = servers_.size();
auto deadline = absl::Now() + (timeout * grpc_test_slowdown_factor());
gpr_log(GPR_INFO,
"========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
") ==========",
start_index, stop_index);
while (!SeenAllServers(start_index, stop_index)) {
if (ignore_failure) {
SendRpc(stub);
Status status = SendRpc(stub);
if (status_check != nullptr) {
if (!status.ok()) status_check(status);
} else {
CheckRpcSendOk(stub, location, true);
EXPECT_TRUE(status.ok())
<< " code=" << status.error_code() << " message=\""
<< status.error_message() << "\" at " << location.file() << ":"
<< location.line();
}
EXPECT_LE(absl::Now(), deadline)
<< " at " << location.file() << ":" << location.line();
@ -447,11 +463,12 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
void WaitForServer(
const grpc_core::DebugLocation& location,
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t server_index, const grpc_core::DebugLocation& location,
bool ignore_failure = false) {
WaitForServers(stub, server_index, server_index + 1, location,
ignore_failure);
size_t server_index,
std::function<void(const Status&)> status_check = nullptr) {
WaitForServers(location, stub, server_index, server_index + 1,
status_check);
}
bool WaitForChannelState(
@ -482,13 +499,6 @@ class ClientLbEnd2endTest : public ::testing::Test {
return WaitForChannelState(channel, predicate, true, timeout_seconds);
}
bool SeenAllServers() {
for (const auto& server : servers_) {
if (server->service_.request_count() == 0) return false;
}
return true;
}
// Updates \a connection_order by appending to it the index of the newly
// connected server. Must be called after every single RPC.
void UpdateConnectionOrder(
@ -550,7 +560,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
// After sending RPC, channel state should be READY.
gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***");
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// After a period time not using the channel, the channel state should switch
// to IDLE.
@ -560,7 +570,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
// Sending a new RPC should awake the IDLE channel.
gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***");
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
}
@ -580,7 +590,7 @@ TEST_F(PickFirstTest, Basic) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
// All requests should have gone to a single server.
bool found = false;
@ -604,7 +614,7 @@ TEST_F(PickFirstTest, ProcessPending) {
"", response_generator); // test that pick first is the default.
auto stub = BuildStub(channel);
response_generator.SetNextResolution({servers_[0]->port_});
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
// Create a new channel and its corresponding PF LB policy, which will pick
// the subchannels in READY state from the previous RPC against the same
// target (even if it happened over a different channel, because subchannels
@ -614,7 +624,7 @@ TEST_F(PickFirstTest, ProcessPending) {
auto second_channel = BuildChannel("", second_response_generator);
auto second_stub = BuildStub(second_channel);
second_response_generator.SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(second_stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, second_stub);
}
TEST_F(PickFirstTest, SelectsReadyAtStartup) {
@ -631,7 +641,7 @@ TEST_F(PickFirstTest, SelectsReadyAtStartup) {
auto stub1 = BuildStub(channel1);
response_generator1.SetNextResolution(ports);
// Wait for second server to be ready.
WaitForServer(stub1, 1, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub1, 1);
// Create a second channel with the same addresses. Its PF instance
// should immediately pick the second subchannel, since it's already
// in READY state.
@ -1017,7 +1027,7 @@ TEST_F(PickFirstTest, Updates) {
ports.emplace_back(servers_[0]->port_);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
@ -1036,7 +1046,7 @@ TEST_F(PickFirstTest, Updates) {
ports.emplace_back(servers_[1]->port_);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [1] *******");
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 1);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
// And again for servers_[2]
@ -1044,7 +1054,7 @@ TEST_F(PickFirstTest, Updates) {
ports.emplace_back(servers_[2]->port_);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [2] *******");
WaitForServer(stub, 2, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 2);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
EXPECT_EQ(servers_[1]->service_.request_count(), 0);
@ -1066,7 +1076,7 @@ TEST_F(PickFirstTest, UpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
servers_[0]->service_.ResetCounters();
@ -1076,9 +1086,9 @@ TEST_F(PickFirstTest, UpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET superset *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
// We stick to the previously connected server.
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
EXPECT_EQ(0, servers_[1]->service_.request_count());
// Check LB policy name for the channel.
@ -1099,7 +1109,7 @@ TEST_F(PickFirstTest, UpdateToUnconnected) {
ports.emplace_back(servers_[0]->port_);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
// Send resolution for which all servers are currently unavailable. Eventually
// this triggers replacing the existing working subchannel_list with the new
@ -1132,10 +1142,10 @@ TEST_F(PickFirstTest, GlobalSubchannelPool) {
auto channel2 = BuildChannel("pick_first", response_generator2);
auto stub2 = BuildStub(channel2);
response_generator2.SetNextResolution(ports);
WaitForServer(stub1, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub1, 0);
// Send one RPC on each channel.
CheckRpcSendOk(stub1, DEBUG_LOCATION);
CheckRpcSendOk(stub2, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub1);
CheckRpcSendOk(DEBUG_LOCATION, stub2);
// The server receives two requests.
EXPECT_EQ(2, servers_[0]->service_.request_count());
// The two requests are from the same client port, because the two channels
@ -1159,10 +1169,10 @@ TEST_F(PickFirstTest, LocalSubchannelPool) {
auto channel2 = BuildChannel("pick_first", response_generator2, args);
auto stub2 = BuildStub(channel2);
response_generator2.SetNextResolution(ports);
WaitForServer(stub1, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub1, 0);
// Send one RPC on each channel.
CheckRpcSendOk(stub1, DEBUG_LOCATION);
CheckRpcSendOk(stub2, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub1);
CheckRpcSendOk(DEBUG_LOCATION, stub2);
// The server receives two requests.
EXPECT_EQ(2, servers_[0]->service_.request_count());
// The two requests are from two client ports, because the two channels didn't
@ -1184,7 +1194,7 @@ TEST_F(PickFirstTest, ManyUpdates) {
response_generator.SetNextResolution(ports);
// We should re-enter core at the end of the loop to give the resolution
// setting closure a chance to run.
if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
if ((i + 1) % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
}
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
@ -1210,13 +1220,19 @@ TEST_F(PickFirstTest, ReresolutionNoSelected) {
// selected subchannel. Re-resolution will return the same result.
response_generator.SetNextResolution(dead_ports);
gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
for (size_t i = 0; i < 10; ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"failed to connect to all addresses");
}
// Set a re-resolution result that contains reachable ports, so that the
// pick_first LB policy can recover soon.
response_generator.SetNextResolutionUponError(alive_ports);
gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
CheckRpcSendOk(stub, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) {
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
EXPECT_EQ("failed to connect to all addresses", status.error_message());
});
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
@ -1230,13 +1246,13 @@ TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[0]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
StartServers(1, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
}
TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
@ -1249,13 +1265,13 @@ TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 1);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[1]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
StartServers(2, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
}
TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
@ -1266,7 +1282,7 @@ TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
auto stub_1 = BuildStub(channel_1);
response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
WaitForServer(stub_1, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub_1, 0);
gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
servers_[0]->Shutdown();
// Channel 1 will receive a re-resolution containing the same server. It will
@ -1278,7 +1294,10 @@ TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
auto stub_2 = BuildStub(channel_2);
response_generator_2.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
WaitForServer(DEBUG_LOCATION, stub_2, 0, [](const Status& status) {
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
EXPECT_EQ("failed to connect to all addresses", status.error_message());
});
gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
servers_[0]->Shutdown();
// Wait until the disconnection has triggered the connectivity notification.
@ -1290,7 +1309,7 @@ TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
// The first call after the server restart will succeed.
CheckRpcSendOk(stub_2, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub_2);
gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
@ -1307,7 +1326,7 @@ TEST_F(PickFirstTest, IdleOnDisconnect) {
BuildChannel("", response_generator); // pick_first is the default.
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Stop server. Channel should go into state IDLE.
response_generator.SetFailureOnReresolution();
@ -1328,7 +1347,7 @@ TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
// Initially resolve to first server and make sure it connects.
gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
response_generator.SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
CheckRpcSendOk(DEBUG_LOCATION, stub, true /* wait_for_ready */);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Send a resolution update with the remaining servers, none of which are
// running yet, so the update will stay pending. Note that it's important
@ -1341,7 +1360,7 @@ TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
"(not started) servers.");
response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
// RPCs will continue to be sent to the first server.
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
// Now stop the first server, so that the current subchannel list
// fails. This should cause us to immediately swap over to the
// pending list, even though it's not yet connected. The state should
@ -1363,7 +1382,10 @@ TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
// The channel should go to READY state and RPCs should go to the
// second server.
WaitForChannelReady(channel.get());
WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
WaitForServer(DEBUG_LOCATION, stub, 1, [](const Status& status) {
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
EXPECT_EQ("failed to connect to all addresses", status.error_message());
});
}
TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
@ -1375,7 +1397,7 @@ TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
BuildChannel("", response_generator); // pick_first is the default.
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Stop server. Channel should go into state IDLE.
servers_[0]->Shutdown();
@ -1390,7 +1412,7 @@ TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
// and then try to send an RPC. Channel should go back into state READY.
StartServer(0);
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
}
@ -1410,7 +1432,8 @@ TEST_F(PickFirstTest,
response_generator.SetNextResolution(ports);
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
// Send an RPC, which should fail.
CheckRpcSendFailure(stub);
CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"failed to connect to all addresses");
// Channel should be in TRANSIENT_FAILURE.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
// Now start a server on the last port.
@ -1419,7 +1442,7 @@ TEST_F(PickFirstTest,
EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_timeout_seconds_to_deadline(4)));
EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
//
@ -1438,15 +1461,15 @@ TEST_F(RoundRobinTest, Basic) {
response_generator.SetNextResolution(GetServersPorts());
// Wait until all backends are ready.
do {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
} while (!SeenAllServers());
ResetCounters();
// "Sync" to the end of the list. Next sequence of picks will start at the
// first server (index 0).
WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
std::vector<int> connection_order;
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
UpdateConnectionOrder(servers_, &connection_order);
}
// Backends should be iterated over in the order in which the addresses were
@ -1463,7 +1486,7 @@ TEST_F(RoundRobinTest, ProcessPending) {
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution({servers_[0]->port_});
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
// Create a new channel and its corresponding RR LB policy, which will pick
// the subchannels in READY state from the previous RPC against the same
// target (even if it happened over a different channel, because subchannels
@ -1473,7 +1496,7 @@ TEST_F(RoundRobinTest, ProcessPending) {
auto second_channel = BuildChannel("round_robin", second_response_generator);
auto second_stub = BuildStub(second_channel);
second_response_generator.SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(second_stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, second_stub);
}
TEST_F(RoundRobinTest, Updates) {
@ -1487,9 +1510,9 @@ TEST_F(RoundRobinTest, Updates) {
gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
std::vector<int> ports = {servers_[0]->port_};
response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
// Send RPCs. They should all go servers_[0]
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -1502,8 +1525,8 @@ TEST_F(RoundRobinTest, Updates) {
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0, servers_[1]->service_.request_count());
WaitForServer(stub, 1, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 1);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -1513,8 +1536,8 @@ TEST_F(RoundRobinTest, Updates) {
ports.clear();
ports.emplace_back(servers_[2]->port_);
response_generator.SetNextResolution(ports);
WaitForServer(stub, 2, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 2);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count());
@ -1526,9 +1549,9 @@ TEST_F(RoundRobinTest, Updates) {
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
response_generator.SetNextResolution(ports);
WaitForServers(stub, 0, 3, DEBUG_LOCATION);
WaitForServers(DEBUG_LOCATION, stub);
// Send three RPCs, one per server.
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count());
@ -1538,14 +1561,15 @@ TEST_F(RoundRobinTest, Updates) {
ports.clear();
response_generator.SetNextResolution(ports);
WaitForChannelNotReady(channel.get());
CheckRpcSendFailure(stub);
CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"empty address list: fake resolver empty address list");
servers_[0]->service_.ResetCounters();
// Next update introduces servers_[1], making the channel recover.
gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
ports.clear();
ports.emplace_back(servers_[1]->port_);
response_generator.SetNextResolution(ports);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 1);
EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
@ -1560,7 +1584,7 @@ TEST_F(RoundRobinTest, UpdateInError) {
response_generator.SetNextResolution(GetServersPorts(0, 1));
// Send RPCs. They should all go to server 0.
for (size_t i = 0; i < 10; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/false,
CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
/*load_report=*/nullptr, /*timeout_ms=*/4000);
}
EXPECT_EQ(10, servers_[0]->service_.request_count());
@ -1570,7 +1594,7 @@ TEST_F(RoundRobinTest, UpdateInError) {
std::vector<int> ports = {servers_[0]->port_, grpc_pick_unused_port_or_die(),
servers_[1]->port_};
response_generator.SetNextResolution(ports);
WaitForServers(stub, 0, 2, DEBUG_LOCATION, /*ignore_failure=*/false,
WaitForServers(DEBUG_LOCATION, stub, 0, 2, /*status_check=*/nullptr,
/*timeout=*/absl::Seconds(60));
// Send a bunch more RPCs. They should all succeed and should be
// split evenly between the two servers.
@ -1579,7 +1603,7 @@ TEST_F(RoundRobinTest, UpdateInError) {
// report READY before the subchannel for the unreachable server
// transitions from CONNECTING to TRANSIENT_FAILURE.
for (size_t i = 0; i < 10; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/false,
CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
/*load_report=*/nullptr, /*timeout_ms=*/4000);
}
EXPECT_THAT(servers_[0]->service_.request_count(),
@ -1602,7 +1626,7 @@ TEST_F(RoundRobinTest, ManyUpdates) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
response_generator.SetNextResolution(ports);
if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
if (i % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
@ -1619,7 +1643,7 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
std::vector<int> ports = {servers_[0]->port_, servers_[1]->port_};
response_generator.SetNextResolution(ports);
// Wait for both servers to be seen.
WaitForServers(stub, 0, 2, DEBUG_LOCATION);
WaitForServers(DEBUG_LOCATION, stub, 0, 2);
// Tell the fake resolver to send an update that adds the last server, but
// only when the LB policy requests re-resolution.
ports.push_back(servers_[2]->port_);
@ -1631,7 +1655,7 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
}
// Wait for the client to see server 2.
WaitForServer(stub, 2, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 2);
}
TEST_F(RoundRobinTest, TransientFailure) {
@ -1789,11 +1813,11 @@ TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
gpr_event_init(&ev);
std::thread thd([&]() {
gpr_log(GPR_INFO, "sending first RPC");
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
gpr_event_set(&ev, reinterpret_cast<void*>(1));
while (!shutdown.load()) {
gpr_log(GPR_INFO, "sending RPC");
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
});
// Wait for first RPC to complete.
@ -1838,11 +1862,11 @@ TEST_F(RoundRobinTest, SingleReconnect) {
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(ports);
for (size_t i = 0; i < kNumServers; ++i) {
WaitForServer(stub, i, DEBUG_LOCATION);
}
WaitForServers(DEBUG_LOCATION, stub);
// Sync to end of list.
WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// One request should have gone to each server.
@ -1854,11 +1878,15 @@ TEST_F(RoundRobinTest, SingleReconnect) {
servers_[0]->Shutdown();
// Client request still succeed. May need retrying if RR had returned a pick
// before noticing the change in the server's connectivity.
while (!SendRpc(stub)) {
} // Retry until success.
while (true) {
Status status = SendRpc(stub);
if (status.ok()) break;
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
EXPECT_EQ("", status.error_message());
}
// Send a bunch of RPCs that should succeed.
for (int i = 0; i < 10 * kNumServers; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
const auto post_death = servers_[0]->service_.request_count();
// No requests have gone to the deceased server.
@ -1869,7 +1897,7 @@ TEST_F(RoundRobinTest, SingleReconnect) {
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before
// the server was fully back up).
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
}
// If health checking is required by client but health checking service
@ -1885,7 +1913,7 @@ TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution({servers_[0]->port_});
EXPECT_TRUE(WaitForChannelReady(channel.get()));
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
TEST_F(RoundRobinTest, HealthChecking) {
@ -1913,7 +1941,7 @@ TEST_F(RoundRobinTest, HealthChecking) {
servers_[0]->SetServingStatus("health_check_service_name", true);
EXPECT_TRUE(WaitForChannelReady(channel.get()));
for (int i = 0; i < 10; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
@ -1921,9 +1949,9 @@ TEST_F(RoundRobinTest, HealthChecking) {
// Now set a second server to be healthy.
gpr_log(GPR_INFO, "*** server 2 healthy");
servers_[2]->SetServingStatus("health_check_service_name", true);
WaitForServer(stub, 2, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 2);
for (int i = 0; i < 10; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
EXPECT_EQ(5, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
@ -1931,9 +1959,9 @@ TEST_F(RoundRobinTest, HealthChecking) {
// Now set the remaining server to be healthy.
gpr_log(GPR_INFO, "*** server 1 healthy");
servers_[1]->SetServingStatus("health_check_service_name", true);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 1);
for (int i = 0; i < 9; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
EXPECT_EQ(3, servers_[0]->service_.request_count());
EXPECT_EQ(3, servers_[1]->service_.request_count());
@ -1947,7 +1975,7 @@ TEST_F(RoundRobinTest, HealthChecking) {
do {
ResetCounters();
for (int i = 0; i < kNumServers; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
} while (servers_[1]->service_.request_count() != 2 &&
servers_[2]->service_.request_count() != 2);
@ -1957,7 +1985,8 @@ TEST_F(RoundRobinTest, HealthChecking) {
servers_[1]->SetServingStatus("health_check_service_name", false);
servers_[2]->SetServingStatus("health_check_service_name", false);
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
CheckRpcSendFailure(stub);
CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing");
// Clean up.
EnableDefaultHealthCheckService(false);
}
@ -1978,14 +2007,14 @@ TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
auto channel = BuildChannel("round_robin", response_generator, args);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(DEBUG_LOCATION, stub, 0);
// Stop server 0 and send a new resolver result to ensure that RR
// checks each subchannel's state.
servers_[0]->Shutdown();
response_generator.SetNextResolution(GetServersPorts());
// Send a bunch more RPCs.
for (size_t i = 0; i < 100; i++) {
SendRpc(stub);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
}
@ -2013,13 +2042,14 @@ TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
// First channel should not become READY, because health checks should be
// failing.
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(stub1);
CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
"connections to all backends failing");
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(stub2, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub2);
// Enable health checks on the backend and wait for channel 1 to succeed.
servers_[0]->SetServingStatus("health_check_service_name", true);
CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
// Check that we created only one subchannel to the backend.
EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
// Clean up.
@ -2056,13 +2086,14 @@ TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
// First channel should not become READY, because health checks should be
// failing.
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(stub1);
CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
"connections to all backends failing");
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(stub2, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub2);
// Enable health checks for channel 1 and wait for it to succeed.
servers_[0]->SetServingStatus("health_check_service_name", true);
CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
// Check that we created only one subchannel to the backend.
EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
// Clean up.
@ -2164,7 +2195,7 @@ TEST_F(ClientLbPickArgsTest, Basic) {
// Check LB policy name for the channel.
EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
// Now send an RPC and check that the picker sees the expected data.
CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/true);
CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
auto pick_args_seen_list = args_seen_list();
EXPECT_THAT(pick_args_seen_list,
::testing::ElementsAre(::testing::AllOf(
@ -2267,7 +2298,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
// Send an OK RPC.
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
// Check LB policy name for the channel.
EXPECT_EQ("intercept_trailing_metadata_lb",
channel->GetLoadBalancingPolicyName());
@ -2286,9 +2317,8 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) {
auto* expected_error = request.mutable_param()->mutable_expected_error();
expected_error->set_code(GRPC_STATUS_PERMISSION_DENIED);
expected_error->set_error_message("bummer, man");
Status status;
SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000, &status,
/*wait_for_ready=*/false, &request);
Status status = SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
/*wait_for_ready=*/false, &request);
EXPECT_EQ(status.error_code(), StatusCode::PERMISSION_DENIED);
EXPECT_EQ(status.error_message(), "bummer, man");
absl::Status status_seen_by_lb = last_status();
@ -2330,7 +2360,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
// Check LB policy name for the channel.
EXPECT_EQ("intercept_trailing_metadata_lb",
@ -2372,7 +2402,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
}
// Check LB policy name for the channel.
EXPECT_EQ("intercept_trailing_metadata_lb",
@ -2407,7 +2437,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION, false, &load_report);
CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
auto actual = backend_load_report();
ASSERT_TRUE(actual.has_value());
// TODO(roth): Change this to use EqualsProto() once that becomes
@ -2502,7 +2532,7 @@ TEST_F(ClientLbAddressTest, Basic) {
response_generator.SetNextResolution(GetServersPorts(), nullptr,
kAttributeKey,
absl::make_unique<Attribute>("foo"));
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
// Check LB policy name for the channel.
EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
// Make sure that the attributes wind up on the subchannels.
@ -2576,7 +2606,7 @@ TEST_F(OobBackendMetricTest, Basic) {
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
// Send an OK RPC.
CheckRpcSendOk(stub, DEBUG_LOCATION);
CheckRpcSendOk(DEBUG_LOCATION, stub);
// Check LB policy name for the channel.
EXPECT_EQ("oob_backend_metric_test_lb",
channel->GetLoadBalancingPolicyName());

Loading…
Cancel
Save