diff --git a/test/cpp/end2end/cfstream_test.cc b/test/cpp/end2end/cfstream_test.cc index 63d76e96f8b..59cf98ffc20 100644 --- a/test/cpp/end2end/cfstream_test.cc +++ b/test/cpp/end2end/cfstream_test.cc @@ -45,6 +45,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/end2end/test_service_impl.h" +#include "test/cpp/util/test_credentials_provider.h" #ifdef GRPC_CFSTREAM using grpc::ClientAsyncResponseReader; @@ -57,13 +58,19 @@ namespace grpc { namespace testing { namespace { -class CFStreamTest : public ::testing::Test { +struct TestScenario { + TestScenario(const grpc::string& creds_type, const grpc::string& content) + : credentials_type(creds_type), message_content(content) {} + const grpc::string credentials_type; + const grpc::string message_content; +}; + +class CFStreamTest : public ::testing::TestWithParam { protected: CFStreamTest() : server_host_("grpctest"), interface_("lo0"), - ipv4_address_("10.0.0.1"), - kRequestMessage_("🖖") {} + ipv4_address_("10.0.0.1") {} void DNSUp() { std::ostringstream cmd; @@ -118,7 +125,7 @@ class CFStreamTest : public ::testing::Test { void StartServer() { port_ = grpc_pick_unused_port_or_die(); - server_.reset(new ServerData(port_)); + server_.reset(new ServerData(port_, GetParam().credentials_type)); server_->Start(server_host_); } void StopServer() { server_->Shutdown(); } @@ -131,8 +138,10 @@ class CFStreamTest : public ::testing::Test { std::shared_ptr BuildChannel() { std::ostringstream server_address; server_address << server_host_ << ":" << port_; - return CreateCustomChannel( - server_address.str(), InsecureChannelCredentials(), ChannelArguments()); + ChannelArguments args; + auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( + GetParam().credentials_type, &args); + return CreateCustomChannel(server_address.str(), channel_creds, args); } void SendRpc( @@ -140,11 +149,12 @@ class CFStreamTest : public ::testing::Test { bool expect_success = false) { auto response = std::unique_ptr(new EchoResponse()); EchoRequest request; - request.set_message(kRequestMessage_); + auto& msg = GetParam().message_content; + request.set_message(msg); ClientContext context; Status status = stub->Echo(&context, request, response.get()); if (status.ok()) { - gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str()); + EXPECT_EQ(msg, response->message()); } else { gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str()); } @@ -156,9 +166,7 @@ class CFStreamTest : public ::testing::Test { const std::unique_ptr& stub, RequestParams param = RequestParams()) { EchoRequest request; - auto msg = std::to_string(ctr.load()); - request.set_message(msg); - ctr++; + request.set_message(GetParam().message_content); *request.mutable_param() = std::move(param); AsyncClientCall* call = new AsyncClientCall; @@ -166,7 +174,6 @@ class CFStreamTest : public ::testing::Test { stub->PrepareAsyncEcho(&call->context, request, &cq_); call->response_reader->StartCall(); - gpr_log(GPR_DEBUG, "Sending request: %s", msg.c_str()); call->response_reader->Finish(&call->reply, &call->status, (void*)call); } @@ -206,12 +213,14 @@ class CFStreamTest : public ::testing::Test { private: struct ServerData { int port_; + const grpc::string creds_; std::unique_ptr server_; TestServiceImpl service_; std::unique_ptr thread_; bool server_ready_ = false; - explicit ServerData(int port) { port_ = port; } + ServerData(int port, const grpc::string& creds) + : port_(port), creds_(creds) {} void Start(const grpc::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); @@ -230,8 +239,9 @@ class CFStreamTest : public ::testing::Test { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; - builder.AddListeningPort(server_address.str(), - InsecureServerCredentials()); + auto server_creds = + GetCredentialsProvider()->GetServerCredentials(creds_); + builder.AddListeningPort(server_address.str(), server_creds); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); std::lock_guard lock(*mu); @@ -251,13 +261,44 @@ class CFStreamTest : public ::testing::Test { const grpc::string ipv4_address_; std::unique_ptr server_; int port_; - const grpc::string kRequestMessage_; - std::atomic_int ctr{0}; }; +std::vector CreateTestScenarios() { + std::vector scenarios; + std::vector credentials_types; + std::vector messages; + + credentials_types.push_back(kInsecureCredentialsType); + auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList(); + for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) { + credentials_types.push_back(*sec); + } + + messages.push_back("🖖"); + for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024; k *= 32) { + grpc::string big_msg; + for (size_t i = 0; i < k * 1024; ++i) { + char c = 'a' + (i % 26); + big_msg += c; + } + messages.push_back(big_msg); + } + for (auto cred = credentials_types.begin(); cred != credentials_types.end(); + ++cred) { + for (auto msg = messages.begin(); msg != messages.end(); msg++) { + scenarios.emplace_back(*cred, *msg); + } + } + + return scenarios; +} + +INSTANTIATE_TEST_CASE_P(CFStreamTest, CFStreamTest, + ::testing::ValuesIn(CreateTestScenarios())); + // gRPC should automatically detech network flaps (without enabling keepalives) // when CFStream is enabled -TEST_F(CFStreamTest, NetworkTransition) { +TEST_P(CFStreamTest, NetworkTransition) { auto channel = BuildChannel(); auto stub = BuildStub(channel); // Channel should be in READY state after we send an RPC @@ -293,7 +334,7 @@ TEST_F(CFStreamTest, NetworkTransition) { } // Network flaps while RPCs are in flight -TEST_F(CFStreamTest, NetworkFlapRpcsInFlight) { +TEST_P(CFStreamTest, NetworkFlapRpcsInFlight) { auto channel = BuildChannel(); auto stub = BuildStub(channel); std::atomic_int rpcs_sent{0}; @@ -318,9 +359,7 @@ TEST_F(CFStreamTest, NetworkFlapRpcsInFlight) { ++total_completions; GPR_ASSERT(ok); AsyncClientCall* call = static_cast(got_tag); - if (call->status.ok()) { - gpr_log(GPR_DEBUG, "RPC response: %s", call->reply.message().c_str()); - } else { + if (!call->status.ok()) { gpr_log(GPR_DEBUG, "RPC failed with error: %s", call->status.error_message().c_str()); // Bring network up when RPCs start failing @@ -347,7 +386,7 @@ TEST_F(CFStreamTest, NetworkFlapRpcsInFlight) { // Send a bunch of RPCs, some of which are expected to fail. // We should get back a response for all RPCs -TEST_F(CFStreamTest, ConcurrentRpc) { +TEST_P(CFStreamTest, ConcurrentRpc) { auto channel = BuildChannel(); auto stub = BuildStub(channel); std::atomic_int rpcs_sent{0}; @@ -361,9 +400,7 @@ TEST_F(CFStreamTest, ConcurrentRpc) { ++total_completions; GPR_ASSERT(ok); AsyncClientCall* call = static_cast(got_tag); - if (call->status.ok()) { - gpr_log(GPR_DEBUG, "RPC response: %s", call->reply.message().c_str()); - } else { + if (!call->status.ok()) { gpr_log(GPR_DEBUG, "RPC failed: %s", call->status.error_message().c_str()); // Bring network up when RPCs start failing