From 35b7d88654962e583cbc79b457b1628336371de9 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 7 Jun 2022 11:59:08 -0700 Subject: [PATCH] client_lb_end2end_test: explicitly check failed RPC statuses (#29906) * client_lb_end2end_test: explicitly check failed RPC statuses * appease clang-tidy * fix memory leak --- .../resolver/fake/fake_resolver.cc | 10 +- test/cpp/end2end/client_lb_end2end_test.cc | 288 ++++++++++-------- 2 files changed, 164 insertions(+), 134 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 1121c148f99..16fb580cfcf 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -151,14 +151,14 @@ void FakeResolver::MaybeSendResultLocked() { result_handler_->ReportResult(std::move(result)); return_failure_ = false; } else if (has_next_result_) { - Result result; - result.addresses = std::move(next_result_.addresses); - result.service_config = std::move(next_result_.service_config); // When both next_results_ and channel_args_ contain an arg with the same // name, only the one in next_results_ will be kept since next_results_ is // before channel_args_. - result.args = grpc_channel_args_union(next_result_.args, channel_args_); - result_handler_->ReportResult(std::move(result)); + grpc_channel_args* new_args = + grpc_channel_args_union(next_result_.args, channel_args_); + grpc_channel_args_destroy(next_result_.args); + next_result_.args = new_args; + result_handler_->ReportResult(std::move(next_result_)); has_next_result_ = false; } } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c2c63a52230..31f4521a57c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -28,6 +28,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include "absl/strings/str_join.h" +#include "absl/strings/string_view.h" #include #include @@ -201,6 +202,9 @@ class FakeResolverResponseGeneratorWrapper { result.addresses->emplace_back(address.addr, address.len, nullptr /* args */, std::move(attributes)); } + if (result.addresses->empty()) { + result.resolution_note = "fake resolver empty address list"; + } if (service_config_json != nullptr) { grpc_error_handle error = GRPC_ERROR_NONE; result.service_config = grpc_core::ServiceConfigImpl::Create( @@ -301,11 +305,10 @@ class ClientLbEnd2endTest : public ::testing::Test { return grpc::CreateCustomChannel("fake:///", creds_, args); } - bool SendRpc( + Status SendRpc( const std::unique_ptr& stub, EchoResponse* response = nullptr, int timeout_ms = 1000, - Status* result = nullptr, bool wait_for_ready = false, - EchoRequest* request = nullptr) { + bool wait_for_ready = false, EchoRequest* request = nullptr) { EchoResponse local_response; if (response == nullptr) response = &local_response; EchoRequest local_request; @@ -318,18 +321,16 @@ class ClientLbEnd2endTest : public ::testing::Test { context.AddMetadata("foo", "1"); context.AddMetadata("bar", "2"); context.AddMetadata("baz", "3"); - Status status = stub->Echo(&context, *request, response); - if (result != nullptr) *result = status; - return status.ok(); + return stub->Echo(&context, *request, response); } void CheckRpcSendOk( + const grpc_core::DebugLocation& location, const std::unique_ptr& stub, - const grpc_core::DebugLocation& location, bool wait_for_ready = false, + bool wait_for_ready = false, xds::data::orca::v3::OrcaLoadReport* load_report = nullptr, int timeout_ms = 2000) { EchoResponse response; - Status status; EchoRequest request; EchoRequest* request_ptr = nullptr; if (load_report != nullptr) { @@ -338,20 +339,26 @@ class ClientLbEnd2endTest : public ::testing::Test { auto backend_metrics = params->mutable_backend_metrics(); *backend_metrics = *load_report; } - const bool success = SendRpc(stub, &response, timeout_ms, &status, - wait_for_ready, request_ptr); - ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line() - << "\nError: " << status.error_message() << " " - << status.error_details(); + Status status = + SendRpc(stub, &response, timeout_ms, wait_for_ready, request_ptr); + ASSERT_TRUE(status.ok()) + << "From " << location.file() << ":" << location.line() + << "\nError: " << status.error_message() << " " + << status.error_details(); ASSERT_EQ(response.message(), kRequestMessage) << "From " << location.file() << ":" << location.line(); - if (!success) abort(); } void CheckRpcSendFailure( - const std::unique_ptr& stub) { - const bool success = SendRpc(stub); - EXPECT_FALSE(success); + const grpc_core::DebugLocation& location, + const std::unique_ptr& stub, + StatusCode expected_status, absl::string_view expected_message) { + Status status = SendRpc(stub); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(expected_status, status.error_code()) + << location.file() << ":" << location.line(); + EXPECT_EQ(expected_message, status.error_message()) + << location.file() << ":" << location.line(); } struct ServerData { @@ -416,28 +423,37 @@ class ClientLbEnd2endTest : public ::testing::Test { for (const auto& server : servers_) server->service_.ResetCounters(); } - bool SeenAllServers(size_t start_index, size_t stop_index) { + bool SeenAllServers(size_t start_index = 0, size_t stop_index = 0) { + if (stop_index == 0) stop_index = servers_.size(); for (size_t i = start_index; i < stop_index; ++i) { if (servers_[i]->service_.request_count() == 0) return false; } return true; } + // If status_check is null, all RPCs must succeed. + // If status_check is non-null, it will be called for all non-OK RPCs. void WaitForServers( + const grpc_core::DebugLocation& location, const std::unique_ptr& stub, - size_t start_index, size_t stop_index, - const grpc_core::DebugLocation& location, bool ignore_failure = false, + size_t start_index = 0, size_t stop_index = 0, + std::function status_check = nullptr, absl::Duration timeout = absl::Seconds(30)) { + if (stop_index == 0) stop_index = servers_.size(); auto deadline = absl::Now() + (timeout * grpc_test_slowdown_factor()); gpr_log(GPR_INFO, "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR ") ==========", start_index, stop_index); while (!SeenAllServers(start_index, stop_index)) { - if (ignore_failure) { - SendRpc(stub); + Status status = SendRpc(stub); + if (status_check != nullptr) { + if (!status.ok()) status_check(status); } else { - CheckRpcSendOk(stub, location, true); + EXPECT_TRUE(status.ok()) + << " code=" << status.error_code() << " message=\"" + << status.error_message() << "\" at " << location.file() << ":" + << location.line(); } EXPECT_LE(absl::Now(), deadline) << " at " << location.file() << ":" << location.line(); @@ -447,11 +463,12 @@ class ClientLbEnd2endTest : public ::testing::Test { } void WaitForServer( + const grpc_core::DebugLocation& location, const std::unique_ptr& stub, - size_t server_index, const grpc_core::DebugLocation& location, - bool ignore_failure = false) { - WaitForServers(stub, server_index, server_index + 1, location, - ignore_failure); + size_t server_index, + std::function status_check = nullptr) { + WaitForServers(location, stub, server_index, server_index + 1, + status_check); } bool WaitForChannelState( @@ -482,13 +499,6 @@ class ClientLbEnd2endTest : public ::testing::Test { return WaitForChannelState(channel, predicate, true, timeout_seconds); } - bool SeenAllServers() { - for (const auto& server : servers_) { - if (server->service_.request_count() == 0) return false; - } - return true; - } - // Updates \a connection_order by appending to it the index of the newly // connected server. Must be called after every single RPC. void UpdateConnectionOrder( @@ -550,7 +560,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) { // After sending RPC, channel state should be READY. gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***"); response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // After a period time not using the channel, the channel state should switch // to IDLE. @@ -560,7 +570,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) { // Sending a new RPC should awake the IDLE channel. gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***"); response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); } @@ -580,7 +590,7 @@ TEST_F(PickFirstTest, Basic) { auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < servers_.size(); ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } // All requests should have gone to a single server. bool found = false; @@ -604,7 +614,7 @@ TEST_F(PickFirstTest, ProcessPending) { "", response_generator); // test that pick first is the default. auto stub = BuildStub(channel); response_generator.SetNextResolution({servers_[0]->port_}); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); // Create a new channel and its corresponding PF LB policy, which will pick // the subchannels in READY state from the previous RPC against the same // target (even if it happened over a different channel, because subchannels @@ -614,7 +624,7 @@ TEST_F(PickFirstTest, ProcessPending) { auto second_channel = BuildChannel("", second_response_generator); auto second_stub = BuildStub(second_channel); second_response_generator.SetNextResolution({servers_[0]->port_}); - CheckRpcSendOk(second_stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, second_stub); } TEST_F(PickFirstTest, SelectsReadyAtStartup) { @@ -631,7 +641,7 @@ TEST_F(PickFirstTest, SelectsReadyAtStartup) { auto stub1 = BuildStub(channel1); response_generator1.SetNextResolution(ports); // Wait for second server to be ready. - WaitForServer(stub1, 1, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub1, 1); // Create a second channel with the same addresses. Its PF instance // should immediately pick the second subchannel, since it's already // in READY state. @@ -1017,7 +1027,7 @@ TEST_F(PickFirstTest, Updates) { ports.emplace_back(servers_[0]->port_); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(servers_[0]->service_.request_count(), 1); // An empty update will result in the channel going into TRANSIENT_FAILURE. @@ -1036,7 +1046,7 @@ TEST_F(PickFirstTest, Updates) { ports.emplace_back(servers_[1]->port_); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [1] *******"); - WaitForServer(stub, 1, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 1); EXPECT_EQ(servers_[0]->service_.request_count(), 0); // And again for servers_[2] @@ -1044,7 +1054,7 @@ TEST_F(PickFirstTest, Updates) { ports.emplace_back(servers_[2]->port_); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [2] *******"); - WaitForServer(stub, 2, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 2); EXPECT_EQ(servers_[0]->service_.request_count(), 0); EXPECT_EQ(servers_[1]->service_.request_count(), 0); @@ -1066,7 +1076,7 @@ TEST_F(PickFirstTest, UpdateSuperset) { ports.emplace_back(servers_[0]->port_); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(servers_[0]->service_.request_count(), 1); servers_[0]->service_.ResetCounters(); @@ -1076,9 +1086,9 @@ TEST_F(PickFirstTest, UpdateSuperset) { ports.emplace_back(servers_[0]->port_); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET superset *******"); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); // We stick to the previously connected server. - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); EXPECT_EQ(0, servers_[1]->service_.request_count()); // Check LB policy name for the channel. @@ -1099,7 +1109,7 @@ TEST_F(PickFirstTest, UpdateToUnconnected) { ports.emplace_back(servers_[0]->port_); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); // Send resolution for which all servers are currently unavailable. Eventually // this triggers replacing the existing working subchannel_list with the new @@ -1132,10 +1142,10 @@ TEST_F(PickFirstTest, GlobalSubchannelPool) { auto channel2 = BuildChannel("pick_first", response_generator2); auto stub2 = BuildStub(channel2); response_generator2.SetNextResolution(ports); - WaitForServer(stub1, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub1, 0); // Send one RPC on each channel. - CheckRpcSendOk(stub1, DEBUG_LOCATION); - CheckRpcSendOk(stub2, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub1); + CheckRpcSendOk(DEBUG_LOCATION, stub2); // The server receives two requests. EXPECT_EQ(2, servers_[0]->service_.request_count()); // The two requests are from the same client port, because the two channels @@ -1159,10 +1169,10 @@ TEST_F(PickFirstTest, LocalSubchannelPool) { auto channel2 = BuildChannel("pick_first", response_generator2, args); auto stub2 = BuildStub(channel2); response_generator2.SetNextResolution(ports); - WaitForServer(stub1, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub1, 0); // Send one RPC on each channel. - CheckRpcSendOk(stub1, DEBUG_LOCATION); - CheckRpcSendOk(stub2, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub1); + CheckRpcSendOk(DEBUG_LOCATION, stub2); // The server receives two requests. EXPECT_EQ(2, servers_[0]->service_.request_count()); // The two requests are from two client ports, because the two channels didn't @@ -1184,7 +1194,7 @@ TEST_F(PickFirstTest, ManyUpdates) { response_generator.SetNextResolution(ports); // We should re-enter core at the end of the loop to give the resolution // setting closure a chance to run. - if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION); + if ((i + 1) % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub); } // Check LB policy name for the channel. EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); @@ -1210,13 +1220,19 @@ TEST_F(PickFirstTest, ReresolutionNoSelected) { // selected subchannel. Re-resolution will return the same result. response_generator.SetNextResolution(dead_ports); gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******"); - for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub); + for (size_t i = 0; i < 10; ++i) { + CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, + "failed to connect to all addresses"); + } // Set a re-resolution result that contains reachable ports, so that the // pick_first LB policy can recover soon. response_generator.SetNextResolutionUponError(alive_ports); gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******"); - WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */); - CheckRpcSendOk(stub, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) { + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + EXPECT_EQ("failed to connect to all addresses", status.error_message()); + }); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(servers_[0]->service_.request_count(), 1); // Check LB policy name for the channel. EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); @@ -1230,13 +1246,13 @@ TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) { auto stub = BuildStub(channel); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******"); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); gpr_log(GPR_INFO, "****** STOPPING SERVER ******"); servers_[0]->Shutdown(); EXPECT_TRUE(WaitForChannelNotReady(channel.get())); gpr_log(GPR_INFO, "****** RESTARTING SERVER ******"); StartServers(1, ports); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); } TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) { @@ -1249,13 +1265,13 @@ TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) { auto stub = BuildStub(channel); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******"); - WaitForServer(stub, 1, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 1); gpr_log(GPR_INFO, "****** STOPPING SERVER ******"); servers_[1]->Shutdown(); EXPECT_TRUE(WaitForChannelNotReady(channel.get())); gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******"); StartServers(2, ports); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); } TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { @@ -1266,7 +1282,7 @@ TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { auto stub_1 = BuildStub(channel_1); response_generator.SetNextResolution(ports); gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******"); - WaitForServer(stub_1, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub_1, 0); gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******"); servers_[0]->Shutdown(); // Channel 1 will receive a re-resolution containing the same server. It will @@ -1278,7 +1294,10 @@ TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { auto stub_2 = BuildStub(channel_2); response_generator_2.SetNextResolution(ports); gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******"); - WaitForServer(stub_2, 0, DEBUG_LOCATION, true); + WaitForServer(DEBUG_LOCATION, stub_2, 0, [](const Status& status) { + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + EXPECT_EQ("failed to connect to all addresses", status.error_message()); + }); gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******"); servers_[0]->Shutdown(); // Wait until the disconnection has triggered the connectivity notification. @@ -1290,7 +1309,7 @@ TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******"); gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******"); // The first call after the server restart will succeed. - CheckRpcSendOk(stub_2, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub_2); gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******"); // Check LB policy name for the channel. EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName()); @@ -1307,7 +1326,7 @@ TEST_F(PickFirstTest, IdleOnDisconnect) { BuildChannel("", response_generator); // pick_first is the default. auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Stop server. Channel should go into state IDLE. response_generator.SetFailureOnReresolution(); @@ -1328,7 +1347,7 @@ TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) { // Initially resolve to first server and make sure it connects. gpr_log(GPR_INFO, "Phase 1: Connect to first server."); response_generator.SetNextResolution({servers_[0]->port_}); - CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */); + CheckRpcSendOk(DEBUG_LOCATION, stub, true /* wait_for_ready */); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Send a resolution update with the remaining servers, none of which are // running yet, so the update will stay pending. Note that it's important @@ -1341,7 +1360,7 @@ TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) { "(not started) servers."); response_generator.SetNextResolution(GetServersPorts(1 /* start_index */)); // RPCs will continue to be sent to the first server. - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); // Now stop the first server, so that the current subchannel list // fails. This should cause us to immediately swap over to the // pending list, even though it's not yet connected. The state should @@ -1363,7 +1382,10 @@ TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) { // The channel should go to READY state and RPCs should go to the // second server. WaitForChannelReady(channel.get()); - WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */); + WaitForServer(DEBUG_LOCATION, stub, 1, [](const Status& status) { + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + EXPECT_EQ("failed to connect to all addresses", status.error_message()); + }); } TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) { @@ -1375,7 +1397,7 @@ TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) { BuildChannel("", response_generator); // pick_first is the default. auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Stop server. Channel should go into state IDLE. servers_[0]->Shutdown(); @@ -1390,7 +1412,7 @@ TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) { // and then try to send an RPC. Channel should go back into state READY. StartServer(0); response_generator.SetNextResolution(GetServersPorts()); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); } @@ -1410,7 +1432,8 @@ TEST_F(PickFirstTest, response_generator.SetNextResolution(ports); EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false)); // Send an RPC, which should fail. - CheckRpcSendFailure(stub); + CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, + "failed to connect to all addresses"); // Channel should be in TRANSIENT_FAILURE. EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false)); // Now start a server on the last port. @@ -1419,7 +1442,7 @@ TEST_F(PickFirstTest, EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_timeout_seconds_to_deadline(4))); EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(false)); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } // @@ -1438,15 +1461,15 @@ TEST_F(RoundRobinTest, Basic) { response_generator.SetNextResolution(GetServersPorts()); // Wait until all backends are ready. do { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } while (!SeenAllServers()); ResetCounters(); // "Sync" to the end of the list. Next sequence of picks will start at the // first server (index 0). - WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1); std::vector connection_order; for (size_t i = 0; i < servers_.size(); ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); UpdateConnectionOrder(servers_, &connection_order); } // Backends should be iterated over in the order in which the addresses were @@ -1463,7 +1486,7 @@ TEST_F(RoundRobinTest, ProcessPending) { auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); response_generator.SetNextResolution({servers_[0]->port_}); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); // Create a new channel and its corresponding RR LB policy, which will pick // the subchannels in READY state from the previous RPC against the same // target (even if it happened over a different channel, because subchannels @@ -1473,7 +1496,7 @@ TEST_F(RoundRobinTest, ProcessPending) { auto second_channel = BuildChannel("round_robin", second_response_generator); auto second_stub = BuildStub(second_channel); second_response_generator.SetNextResolution({servers_[0]->port_}); - CheckRpcSendOk(second_stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, second_stub); } TEST_F(RoundRobinTest, Updates) { @@ -1487,9 +1510,9 @@ TEST_F(RoundRobinTest, Updates) { gpr_log(GPR_INFO, "*** FIRST BACKEND ***"); std::vector ports = {servers_[0]->port_}; response_generator.SetNextResolution(ports); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); // Send RPCs. They should all go servers_[0] - for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(10, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); @@ -1502,8 +1525,8 @@ TEST_F(RoundRobinTest, Updates) { // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0, servers_[1]->service_.request_count()); - WaitForServer(stub, 1, DEBUG_LOCATION); - for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 1); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(10, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); @@ -1513,8 +1536,8 @@ TEST_F(RoundRobinTest, Updates) { ports.clear(); ports.emplace_back(servers_[2]->port_); response_generator.SetNextResolution(ports); - WaitForServer(stub, 2, DEBUG_LOCATION); - for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 2); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(10, servers_[2]->service_.request_count()); @@ -1526,9 +1549,9 @@ TEST_F(RoundRobinTest, Updates) { ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); response_generator.SetNextResolution(ports); - WaitForServers(stub, 0, 3, DEBUG_LOCATION); + WaitForServers(DEBUG_LOCATION, stub); // Send three RPCs, one per server. - for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); + for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(1, servers_[0]->service_.request_count()); EXPECT_EQ(1, servers_[1]->service_.request_count()); EXPECT_EQ(1, servers_[2]->service_.request_count()); @@ -1538,14 +1561,15 @@ TEST_F(RoundRobinTest, Updates) { ports.clear(); response_generator.SetNextResolution(ports); WaitForChannelNotReady(channel.get()); - CheckRpcSendFailure(stub); + CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, + "empty address list: fake resolver empty address list"); servers_[0]->service_.ResetCounters(); // Next update introduces servers_[1], making the channel recover. gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***"); ports.clear(); ports.emplace_back(servers_[1]->port_); response_generator.SetNextResolution(ports); - WaitForServer(stub, 1, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 1); EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false)); // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); @@ -1560,7 +1584,7 @@ TEST_F(RoundRobinTest, UpdateInError) { response_generator.SetNextResolution(GetServersPorts(0, 1)); // Send RPCs. They should all go to server 0. for (size_t i = 0; i < 10; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/false, + CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false, /*load_report=*/nullptr, /*timeout_ms=*/4000); } EXPECT_EQ(10, servers_[0]->service_.request_count()); @@ -1570,7 +1594,7 @@ TEST_F(RoundRobinTest, UpdateInError) { std::vector ports = {servers_[0]->port_, grpc_pick_unused_port_or_die(), servers_[1]->port_}; response_generator.SetNextResolution(ports); - WaitForServers(stub, 0, 2, DEBUG_LOCATION, /*ignore_failure=*/false, + WaitForServers(DEBUG_LOCATION, stub, 0, 2, /*status_check=*/nullptr, /*timeout=*/absl::Seconds(60)); // Send a bunch more RPCs. They should all succeed and should be // split evenly between the two servers. @@ -1579,7 +1603,7 @@ TEST_F(RoundRobinTest, UpdateInError) { // report READY before the subchannel for the unreachable server // transitions from CONNECTING to TRANSIENT_FAILURE. for (size_t i = 0; i < 10; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/false, + CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false, /*load_report=*/nullptr, /*timeout_ms=*/4000); } EXPECT_THAT(servers_[0]->service_.request_count(), @@ -1602,7 +1626,7 @@ TEST_F(RoundRobinTest, ManyUpdates) { std::shuffle(ports.begin(), ports.end(), std::mt19937(std::random_device()())); response_generator.SetNextResolution(ports); - if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION); + if (i % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub); } // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); @@ -1619,7 +1643,7 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) { std::vector ports = {servers_[0]->port_, servers_[1]->port_}; response_generator.SetNextResolution(ports); // Wait for both servers to be seen. - WaitForServers(stub, 0, 2, DEBUG_LOCATION); + WaitForServers(DEBUG_LOCATION, stub, 0, 2); // Tell the fake resolver to send an update that adds the last server, but // only when the LB policy requests re-resolution. ports.push_back(servers_[2]->port_); @@ -1631,7 +1655,7 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) { grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways(); } // Wait for the client to see server 2. - WaitForServer(stub, 2, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 2); } TEST_F(RoundRobinTest, TransientFailure) { @@ -1789,11 +1813,11 @@ TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) { gpr_event_init(&ev); std::thread thd([&]() { gpr_log(GPR_INFO, "sending first RPC"); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); gpr_event_set(&ev, reinterpret_cast(1)); while (!shutdown.load()) { gpr_log(GPR_INFO, "sending RPC"); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } }); // Wait for first RPC to complete. @@ -1838,11 +1862,11 @@ TEST_F(RoundRobinTest, SingleReconnect) { auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); response_generator.SetNextResolution(ports); - for (size_t i = 0; i < kNumServers; ++i) { - WaitForServer(stub, i, DEBUG_LOCATION); - } + WaitForServers(DEBUG_LOCATION, stub); + // Sync to end of list. + WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1); for (size_t i = 0; i < servers_.size(); ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; } // One request should have gone to each server. @@ -1854,11 +1878,15 @@ TEST_F(RoundRobinTest, SingleReconnect) { servers_[0]->Shutdown(); // Client request still succeed. May need retrying if RR had returned a pick // before noticing the change in the server's connectivity. - while (!SendRpc(stub)) { - } // Retry until success. + while (true) { + Status status = SendRpc(stub); + if (status.ok()) break; + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + EXPECT_EQ("", status.error_message()); + } // Send a bunch of RPCs that should succeed. for (int i = 0; i < 10 * kNumServers; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } const auto post_death = servers_[0]->service_.request_count(); // No requests have gone to the deceased server. @@ -1869,7 +1897,7 @@ TEST_F(RoundRobinTest, SingleReconnect) { // the server managed to start before the RR policy retried the subchannel) or // after the subchannel retry delay otherwise (RR's subchannel retried before // the server was fully back up). - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); } // If health checking is required by client but health checking service @@ -1885,7 +1913,7 @@ TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) { auto stub = BuildStub(channel); response_generator.SetNextResolution({servers_[0]->port_}); EXPECT_TRUE(WaitForChannelReady(channel.get())); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } TEST_F(RoundRobinTest, HealthChecking) { @@ -1913,7 +1941,7 @@ TEST_F(RoundRobinTest, HealthChecking) { servers_[0]->SetServingStatus("health_check_service_name", true); EXPECT_TRUE(WaitForChannelReady(channel.get())); for (int i = 0; i < 10; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } EXPECT_EQ(10, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); @@ -1921,9 +1949,9 @@ TEST_F(RoundRobinTest, HealthChecking) { // Now set a second server to be healthy. gpr_log(GPR_INFO, "*** server 2 healthy"); servers_[2]->SetServingStatus("health_check_service_name", true); - WaitForServer(stub, 2, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 2); for (int i = 0; i < 10; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } EXPECT_EQ(5, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); @@ -1931,9 +1959,9 @@ TEST_F(RoundRobinTest, HealthChecking) { // Now set the remaining server to be healthy. gpr_log(GPR_INFO, "*** server 1 healthy"); servers_[1]->SetServingStatus("health_check_service_name", true); - WaitForServer(stub, 1, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 1); for (int i = 0; i < 9; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } EXPECT_EQ(3, servers_[0]->service_.request_count()); EXPECT_EQ(3, servers_[1]->service_.request_count()); @@ -1947,7 +1975,7 @@ TEST_F(RoundRobinTest, HealthChecking) { do { ResetCounters(); for (int i = 0; i < kNumServers; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } } while (servers_[1]->service_.request_count() != 2 && servers_[2]->service_.request_count() != 2); @@ -1957,7 +1985,8 @@ TEST_F(RoundRobinTest, HealthChecking) { servers_[1]->SetServingStatus("health_check_service_name", false); servers_[2]->SetServingStatus("health_check_service_name", false); EXPECT_TRUE(WaitForChannelNotReady(channel.get())); - CheckRpcSendFailure(stub); + CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, + "connections to all backends failing"); // Clean up. EnableDefaultHealthCheckService(false); } @@ -1978,14 +2007,14 @@ TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) { auto channel = BuildChannel("round_robin", response_generator, args); auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); - WaitForServer(stub, 0, DEBUG_LOCATION); + WaitForServer(DEBUG_LOCATION, stub, 0); // Stop server 0 and send a new resolver result to ensure that RR // checks each subchannel's state. servers_[0]->Shutdown(); response_generator.SetNextResolution(GetServersPorts()); // Send a bunch more RPCs. for (size_t i = 0; i < 100; i++) { - SendRpc(stub); + CheckRpcSendOk(DEBUG_LOCATION, stub); } } @@ -2013,13 +2042,14 @@ TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) { // First channel should not become READY, because health checks should be // failing. EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); - CheckRpcSendFailure(stub1); + CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE, + "connections to all backends failing"); // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); - CheckRpcSendOk(stub2, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub2); // Enable health checks on the backend and wait for channel 1 to succeed. servers_[0]->SetServingStatus("health_check_service_name", true); - CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */); + CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */); // Check that we created only one subchannel to the backend. EXPECT_EQ(1UL, servers_[0]->service_.clients().size()); // Clean up. @@ -2056,13 +2086,14 @@ TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) { // First channel should not become READY, because health checks should be // failing. EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); - CheckRpcSendFailure(stub1); + CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE, + "connections to all backends failing"); // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); - CheckRpcSendOk(stub2, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub2); // Enable health checks for channel 1 and wait for it to succeed. servers_[0]->SetServingStatus("health_check_service_name", true); - CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */); + CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */); // Check that we created only one subchannel to the backend. EXPECT_EQ(1UL, servers_[0]->service_.clients().size()); // Clean up. @@ -2164,7 +2195,7 @@ TEST_F(ClientLbPickArgsTest, Basic) { // Check LB policy name for the channel. EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName()); // Now send an RPC and check that the picker sees the expected data. - CheckRpcSendOk(stub, DEBUG_LOCATION, /*wait_for_ready=*/true); + CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true); auto pick_args_seen_list = args_seen_list(); EXPECT_THAT(pick_args_seen_list, ::testing::ElementsAre(::testing::AllOf( @@ -2267,7 +2298,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) { auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); // Send an OK RPC. - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); // Check LB policy name for the channel. EXPECT_EQ("intercept_trailing_metadata_lb", channel->GetLoadBalancingPolicyName()); @@ -2286,9 +2317,8 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) { auto* expected_error = request.mutable_param()->mutable_expected_error(); expected_error->set_code(GRPC_STATUS_PERMISSION_DENIED); expected_error->set_error_message("bummer, man"); - Status status; - SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000, &status, - /*wait_for_ready=*/false, &request); + Status status = SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000, + /*wait_for_ready=*/false, &request); EXPECT_EQ(status.error_code(), StatusCode::PERMISSION_DENIED); EXPECT_EQ(status.error_message(), "bummer, man"); absl::Status status_seen_by_lb = last_status(); @@ -2330,7 +2360,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) { auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < kNumRpcs; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } // Check LB policy name for the channel. EXPECT_EQ("intercept_trailing_metadata_lb", @@ -2372,7 +2402,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) { auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < kNumRpcs; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); } // Check LB policy name for the channel. EXPECT_EQ("intercept_trailing_metadata_lb", @@ -2407,7 +2437,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); for (size_t i = 0; i < kNumRpcs; ++i) { - CheckRpcSendOk(stub, DEBUG_LOCATION, false, &load_report); + CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report); auto actual = backend_load_report(); ASSERT_TRUE(actual.has_value()); // TODO(roth): Change this to use EqualsProto() once that becomes @@ -2502,7 +2532,7 @@ TEST_F(ClientLbAddressTest, Basic) { response_generator.SetNextResolution(GetServersPorts(), nullptr, kAttributeKey, absl::make_unique("foo")); - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); // Check LB policy name for the channel. EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName()); // Make sure that the attributes wind up on the subchannels. @@ -2576,7 +2606,7 @@ TEST_F(OobBackendMetricTest, Basic) { auto stub = BuildStub(channel); response_generator.SetNextResolution(GetServersPorts()); // Send an OK RPC. - CheckRpcSendOk(stub, DEBUG_LOCATION); + CheckRpcSendOk(DEBUG_LOCATION, stub); // Check LB policy name for the channel. EXPECT_EQ("oob_backend_metric_test_lb", channel->GetLoadBalancingPolicyName());