diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 2659b0e2132..49a38ce544b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -62,7 +62,6 @@ using std::chrono::system_clock; namespace grpc { namespace testing { - namespace { const char* kServerCancelAfterReads = "cancel_after_reads"; @@ -195,8 +194,6 @@ const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll"; const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity"; -} // namespace - class Proxy : public ::grpc::cpp::test::util::TestService::Service { public: Proxy(std::shared_ptr channel) @@ -353,14 +350,24 @@ class TestServiceImplDupPkg } }; -/* Param is whether or not to use a proxy -- some tests use TEST_F as they don't - need this functionality */ -class End2endTest : public ::testing::TestWithParam { +class TestScenario { + public: + TestScenario(bool proxy, bool tls) : use_proxy(proxy), use_tls(tls) {} + void Log() const { + gpr_log(GPR_INFO, "Scenario: proxy %d, tls %d", use_proxy, use_tls); + } + bool use_proxy; + bool use_tls; +}; + +class End2endTest : public ::testing::TestWithParam { protected: End2endTest() : is_server_started_(false), kMaxMessageSize_(8192), - special_service_("special") {} + special_service_("special") { + GetParam().Log(); + } void TearDown() GRPC_OVERRIDE { if (is_server_started_) { @@ -374,13 +381,16 @@ class End2endTest : public ::testing::TestWithParam { server_address_ << "127.0.0.1:" << port; // Setup server ServerBuilder builder; - SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key, - test_server1_cert}; - SslServerCredentialsOptions ssl_opts; - ssl_opts.pem_root_certs = ""; - ssl_opts.pem_key_cert_pairs.push_back(pkcp); - auto server_creds = SslServerCredentials(ssl_opts); - server_creds->SetAuthMetadataProcessor(processor); + auto server_creds = InsecureServerCredentials(); + if (GetParam().use_tls) { + SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key, + test_server1_cert}; + SslServerCredentialsOptions ssl_opts; + ssl_opts.pem_root_certs = ""; + ssl_opts.pem_key_cert_pairs.push_back(pkcp); + server_creds = SslServerCredentials(ssl_opts); + server_creds->SetAuthMetadataProcessor(processor); + } builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); builder.RegisterService("foo.test.youtube.com", &special_service_); @@ -396,17 +406,20 @@ class End2endTest : public ::testing::TestWithParam { StartServer(std::shared_ptr()); } EXPECT_TRUE(is_server_started_); - SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; ChannelArguments args; - args.SetSslTargetNameOverride("foo.test.google.fr"); + auto channel_creds = InsecureCredentials(); + if (GetParam().use_tls) { + SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; + args.SetSslTargetNameOverride("foo.test.google.fr"); + channel_creds = SslCredentials(ssl_opts); + } args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); - channel_ = CreateCustomChannel(server_address_.str(), - SslCredentials(ssl_opts), args); + channel_ = CreateCustomChannel(server_address_.str(), channel_creds, args); } - void ResetStub(bool use_proxy) { + void ResetStub() { ResetChannel(); - if (use_proxy) { + if (GetParam().use_proxy) { proxy_service_.reset(new Proxy(channel_)); int port = grpc_pick_unused_port_or_die(); std::ostringstream proxyaddr; @@ -450,124 +463,8 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, } } -TEST_F(End2endTest, SimpleRpcWithHost) { - ResetStub(false); - - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - context.set_authority("foo.test.youtube.com"); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(response.has_param()); - EXPECT_EQ("special", response.param().host()); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, SimpleRpc) { - ResetStub(GetParam()); - SendRpc(stub_.get(), 1); -} - -TEST_P(End2endTest, MultipleRpcs) { - ResetStub(GetParam()); - std::vector threads; - for (int i = 0; i < 10; ++i) { - threads.push_back(new std::thread(SendRpc, stub_.get(), 10)); - } - for (int i = 0; i < 10; ++i) { - threads[i]->join(); - delete threads[i]; - } -} - -// Set a 10us deadline and make sure proper error is returned. -TEST_P(End2endTest, RpcDeadlineExpires) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::microseconds(10); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code()); -} - -// Set a long but finite deadline. -TEST_P(End2endTest, RpcLongDeadline) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::hours(1); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); -} - -// Ask server to echo back the deadline it sees. -TEST_P(End2endTest, EchoDeadline) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_echo_deadline(true); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(100); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - gpr_timespec sent_deadline; - Timepoint2Timespec(deadline, &sent_deadline); - // Allow 1 second error. - EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 1); - EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1); -} - -// Ask server to echo back the deadline it sees. The rpc has no deadline. -TEST_P(End2endTest, EchoDeadlineForNoDeadlineRpc) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_echo_deadline(true); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - EXPECT_EQ(response.param().request_deadline(), - gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec); -} - -TEST_P(End2endTest, UnimplementedRpc) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - Status s = stub_->Unimplemented(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED); - EXPECT_EQ(s.error_message(), ""); - EXPECT_EQ(response.message(), ""); -} - -TEST_F(End2endTest, RequestStreamOneRequest) { - ResetStub(false); +TEST_P(End2endTest, RequestStreamOneRequest) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -581,8 +478,8 @@ TEST_F(End2endTest, RequestStreamOneRequest) { EXPECT_TRUE(s.ok()); } -TEST_F(End2endTest, RequestStreamTwoRequests) { - ResetStub(false); +TEST_P(End2endTest, RequestStreamTwoRequests) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -597,8 +494,8 @@ TEST_F(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } -TEST_F(End2endTest, ResponseStream) { - ResetStub(false); +TEST_P(End2endTest, ResponseStream) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -617,8 +514,8 @@ TEST_F(End2endTest, ResponseStream) { EXPECT_TRUE(s.ok()); } -TEST_F(End2endTest, BidiStream) { - ResetStub(false); +TEST_P(End2endTest, BidiStream) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -650,8 +547,8 @@ TEST_F(End2endTest, BidiStream) { // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. -TEST_F(End2endTest, DiffPackageServices) { - ResetStub(false); +TEST_P(End2endTest, DiffPackageServices) { + ResetStub(); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -670,33 +567,6 @@ TEST_F(End2endTest, DiffPackageServices) { EXPECT_TRUE(s.ok()); } -// rpc and stream should fail on bad credentials. -TEST_F(End2endTest, BadCredentials) { - std::shared_ptr bad_creds = GoogleRefreshTokenCredentials(""); - EXPECT_EQ(static_cast(nullptr), bad_creds.get()); - std::shared_ptr channel = - CreateChannel(server_address_.str(), bad_creds); - std::unique_ptr stub( - grpc::cpp::test::util::TestService::NewStub(channel)); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("Hello"); - - Status s = stub->Echo(&context, request, &response); - EXPECT_EQ("", response.message()); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code()); - EXPECT_EQ("Invalid credentials.", s.error_message()); - - ClientContext context2; - auto stream = stub->BidiStream(&context2); - s = stream->Finish(); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code()); - EXPECT_EQ("Invalid credentials.", s.error_message()); -} - void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(delay_us, GPR_TIMESPAN))); @@ -705,40 +575,9 @@ void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { context->TryCancel(); } -// Client cancels rpc after 10ms -TEST_P(End2endTest, ClientCancelsRpc) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - const int kCancelDelayUs = 10 * 1000; - request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); - - ClientContext context; - std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); - Status s = stub_->Echo(&context, request, &response); - cancel_thread.join(); - EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); - EXPECT_EQ(s.error_message(), "Cancelled"); -} - -// Server cancels rpc after 1ms -TEST_P(End2endTest, ServerCancelsRpc) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_server_cancel_after_us(1000); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); - EXPECT_TRUE(s.error_message().empty()); -} - // Client cancels request stream after sending two messages -TEST_F(End2endTest, ClientCancelsRequestStream) { - ResetStub(false); +TEST_P(End2endTest, ClientCancelsRequestStream) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -757,8 +596,8 @@ TEST_F(End2endTest, ClientCancelsRequestStream) { } // Client cancels server stream after sending some messages -TEST_F(End2endTest, ClientCancelsResponseStream) { - ResetStub(false); +TEST_P(End2endTest, ClientCancelsResponseStream) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -789,8 +628,8 @@ TEST_F(End2endTest, ClientCancelsResponseStream) { } // Client cancels bidi stream after sending some messages -TEST_F(End2endTest, ClientCancelsBidi) { - ResetStub(false); +TEST_P(End2endTest, ClientCancelsBidi) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -821,8 +660,8 @@ TEST_F(End2endTest, ClientCancelsBidi) { EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); } -TEST_F(End2endTest, RpcMaxMessageSize) { - ResetStub(false); +TEST_P(End2endTest, RpcMaxMessageSize) { + ResetStub(); EchoRequest request; EchoResponse response; request.set_message(string(kMaxMessageSize_ * 2, 'a')); @@ -832,110 +671,347 @@ TEST_F(End2endTest, RpcMaxMessageSize) { EXPECT_FALSE(s.ok()); } -bool MetadataContains( - const std::multimap& metadata, - const grpc::string& key, const grpc::string& value) { - int count = 0; +// Client sends 20 requests and the server returns CANCELLED status after +// reading 10 requests. +TEST_P(End2endTest, RequestStreamServerEarlyCancelTest) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; - for (std::multimap::const_iterator iter = - metadata.begin(); - iter != metadata.end(); ++iter) { - if (ToString(iter->first) == key && ToString(iter->second) == value) { - count++; - } + context.AddMetadata(kServerCancelAfterReads, "10"); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + int send_messages = 20; + while (send_messages > 0) { + EXPECT_TRUE(stream->Write(request)); + send_messages--; } - return count == 1; + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(s.error_code(), StatusCode::CANCELLED); } -TEST_F(End2endTest, SetPerCallCredentials) { - ResetStub(false); - EchoRequest request; - EchoResponse response; - ClientContext context; - std::shared_ptr creds = - GoogleIAMCredentials("fake_token", "fake_selector"); - context.set_credentials(creds); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); +void ReaderThreadFunc(ClientReaderWriter* stream, + gpr_event* ev) { + EchoResponse resp; + gpr_event_set(ev, (void*)1); + while (stream->Read(&resp)) { + gpr_log(GPR_INFO, "Read message"); + } +} - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(request.message(), response.message()); +// Run a Read and a WritesDone simultaneously. +TEST_P(End2endTest, SimultaneousReadWritesDone) { + ResetStub(); + ClientContext context; + gpr_event ev; + gpr_event_init(&ev); + auto stream = stub_->BidiStream(&context); + std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev); + gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); + stream->WritesDone(); + Status s = stream->Finish(); EXPECT_TRUE(s.ok()); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, - "fake_token")); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, - "fake_selector")); + reader_thread.join(); } -TEST_F(End2endTest, InsecurePerCallCredentials) { - ResetStub(false); - EchoRequest request; - EchoResponse response; - ClientContext context; - std::shared_ptr creds = InsecureCredentials(); - context.set_credentials(creds); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); +TEST_P(End2endTest, ChannelState) { + ResetStub(); + // Start IDLE + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); - EXPECT_EQ("Failed to set credentials to rpc.", s.error_message()); + // Did not ask to connect, no state change. + CompletionQueue cq; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(10); + channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL); + void* tag; + bool ok = true; + cq.Next(&tag, &ok); + EXPECT_FALSE(ok); + + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true)); + EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE, + gpr_inf_future(GPR_CLOCK_REALTIME))); + EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false)); } -TEST_F(End2endTest, OverridePerCallCredentials) { - ResetStub(false); - EchoRequest request; - EchoResponse response; - ClientContext context; - std::shared_ptr creds1 = - GoogleIAMCredentials("fake_token1", "fake_selector1"); - context.set_credentials(creds1); - std::shared_ptr creds2 = - GoogleIAMCredentials("fake_token2", "fake_selector2"); - context.set_credentials(creds2); +// Takes 10s. +TEST_P(End2endTest, ChannelStateTimeout) { + if (GetParam().use_tls) { + return; + } + int port = grpc_pick_unused_port_or_die(); + std::ostringstream server_address; + server_address << "127.0.0.1:" << port; + // Channel to non-existing server + auto channel = CreateChannel(server_address.str(), InsecureCredentials()); + // Start IDLE + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true)); + + auto state = GRPC_CHANNEL_IDLE; + for (int i = 0; i < 10; i++) { + channel->WaitForStateChange( + state, std::chrono::system_clock::now() + std::chrono::seconds(1)); + state = channel->GetState(false); + } +} + +// Talking to a non-existing service. +TEST_P(End2endTest, NonExistingService) { + ResetChannel(); + std::unique_ptr stub; + stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel_); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub->Unimplemented(&context, request, &response); + EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code()); + EXPECT_EQ("", s.error_message()); +} + +////////////////////////////////////////////////////////////////////////// +// Test with and without a proxy. +class ProxyEnd2endTest : public End2endTest { + protected: +}; + +TEST_P(ProxyEnd2endTest, SimpleRpc) { + ResetStub(); + SendRpc(stub_.get(), 1); +} + +TEST_P(ProxyEnd2endTest, MultipleRpcs) { + ResetStub(); + std::vector threads; + for (int i = 0; i < 10; ++i) { + threads.push_back(new std::thread(SendRpc, stub_.get(), 10)); + } + for (int i = 0; i < 10; ++i) { + threads[i]->join(); + delete threads[i]; + } +} + +// Set a 10us deadline and make sure proper error is returned. +TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { + ResetStub(); + EchoRequest request; + EchoResponse response; request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); + ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::microseconds(10); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, - "fake_token2")); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, - "fake_selector2")); - EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, - "fake_token1")); - EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, - "fake_selector1")); - EXPECT_EQ(request.message(), response.message()); + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code()); +} + +// Set a long but finite deadline. +TEST_P(ProxyEnd2endTest, RpcLongDeadline) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::hours(1); + context.set_deadline(deadline); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.ok()); } -TEST_F(End2endTest, NonBlockingAuthMetadataPluginFailure) { - ResetStub(false); +// Ask server to echo back the deadline it sees. +TEST_P(ProxyEnd2endTest, EchoDeadline) { + ResetStub(); EchoRequest request; EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_echo_deadline(true); + ClientContext context; - context.set_credentials( - MetadataCredentialsFromPlugin(std::unique_ptr( - new TestMetadataCredentialsPlugin( - "Does not matter, will fail anyway (see 3rd param)", false, - false)))); + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(100); + context.set_deadline(deadline); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + gpr_timespec sent_deadline; + Timepoint2Timespec(deadline, &sent_deadline); + // Allow 1 second error. + EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 1); + EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1); +} + +// Ask server to echo back the deadline it sees. The rpc has no deadline. +TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; request.set_message("Hello"); + request.mutable_param()->set_echo_deadline(true); + ClientContext context; Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(response.param().request_deadline(), + gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec); +} + +TEST_P(ProxyEnd2endTest, UnimplementedRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub_->Unimplemented(&context, request, &response); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); + EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED); + EXPECT_EQ(s.error_message(), ""); + EXPECT_EQ(response.message(), ""); } -TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { - auto* processor = new TestAuthMetadataProcessor(false); +// Client cancels rpc after 10ms +TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + const int kCancelDelayUs = 10 * 1000; + request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); + + ClientContext context; + std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); + Status s = stub_->Echo(&context, request, &response); + cancel_thread.join(); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ(s.error_message(), "Cancelled"); +} + +// Server cancels rpc after 1ms +TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_server_cancel_after_us(1000); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_TRUE(s.error_message().empty()); +} + +// Make the response larger than the flow control window. +TEST_P(ProxyEnd2endTest, HugeResponse) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("huge response"); + const size_t kResponseSize = 1024 * (1024 + 10); + request.mutable_param()->set_response_message_length(kResponseSize); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(kResponseSize, response.message().size()); + EXPECT_TRUE(s.ok()); +} + +TEST_P(ProxyEnd2endTest, Peer) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("hello"); + request.mutable_param()->set_echo_peer(true); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(CheckIsLocalhost(response.param().peer())); + EXPECT_TRUE(CheckIsLocalhost(context.peer())); +} + +////////////////////////////////////////////////////////////////////////// +class SecureEnd2endTest : public End2endTest { + protected: + SecureEnd2endTest() { + GPR_ASSERT(!GetParam().use_proxy); + GPR_ASSERT(GetParam().use_tls); + } +}; + +TEST_P(SecureEnd2endTest, SimpleRpcWithHost) { + ResetStub(); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + context.set_authority("foo.test.youtube.com"); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(response.has_param()); + EXPECT_EQ("special", response.param().host()); + EXPECT_TRUE(s.ok()); +} + +// rpc and stream should fail on bad credentials. +TEST_P(SecureEnd2endTest, BadCredentials) { + std::shared_ptr bad_creds = GoogleRefreshTokenCredentials(""); + EXPECT_EQ(static_cast(nullptr), bad_creds.get()); + std::shared_ptr channel = + CreateChannel(server_address_.str(), bad_creds); + std::unique_ptr stub( + grpc::cpp::test::util::TestService::NewStub(channel)); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("Hello"); + + Status s = stub->Echo(&context, request, &response); + EXPECT_EQ("", response.message()); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code()); + EXPECT_EQ("Invalid credentials.", s.error_message()); + + ClientContext context2; + auto stream = stub->BidiStream(&context2); + s = stream->Finish(); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(StatusCode::INVALID_ARGUMENT, s.error_code()); + EXPECT_EQ("Invalid credentials.", s.error_message()); +} + +bool MetadataContains( + const std::multimap& metadata, + const grpc::string& key, const grpc::string& value) { + int count = 0; + + for (std::multimap::const_iterator iter = + metadata.begin(); + iter != metadata.end(); ++iter) { + if (ToString(iter->first) == key && ToString(iter->second) == value) { + count++; + } + } + return count == 1; +} + +TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { + auto* processor = new TestAuthMetadataProcessor(true); StartServer(std::shared_ptr(processor)); - ResetStub(false); + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -955,10 +1031,10 @@ TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy)); } -TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { - auto* processor = new TestAuthMetadataProcessor(false); +TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { + auto* processor = new TestAuthMetadataProcessor(true); StartServer(std::shared_ptr(processor)); - ResetStub(false); + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -969,16 +1045,83 @@ TEST_F(End2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { EXPECT_FALSE(s.ok()); EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); } +TEST_P(SecureEnd2endTest, SetPerCallCredentials) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + std::shared_ptr creds = + GoogleIAMCredentials("fake_token", "fake_selector"); + context.set_credentials(creds); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); -TEST_F(End2endTest, BlockingAuthMetadataPluginFailure) { - ResetStub(false); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + "fake_token")); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + "fake_selector")); +} + +TEST_P(SecureEnd2endTest, InsecurePerCallCredentials) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + std::shared_ptr creds = InsecureCredentials(); + context.set_credentials(creds); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ("Failed to set credentials to rpc.", s.error_message()); +} + +TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + std::shared_ptr creds1 = + GoogleIAMCredentials("fake_token1", "fake_selector1"); + context.set_credentials(creds1); + std::shared_ptr creds2 = + GoogleIAMCredentials("fake_token2", "fake_selector2"); + context.set_credentials(creds2); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + "fake_token2")); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + "fake_selector2")); + EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + "fake_token1")); + EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + "fake_selector1")); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); +} + +TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; context.set_credentials( MetadataCredentialsFromPlugin(std::unique_ptr( new TestMetadataCredentialsPlugin( - "Does not matter, will fail anyway (see 3rd param)", true, + "Does not matter, will fail anyway (see 3rd param)", false, false)))); request.set_message("Hello"); @@ -987,10 +1130,10 @@ TEST_F(End2endTest, BlockingAuthMetadataPluginFailure) { EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); } -TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { - auto* processor = new TestAuthMetadataProcessor(true); +TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { + auto* processor = new TestAuthMetadataProcessor(false); StartServer(std::shared_ptr(processor)); - ResetStub(false); + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -1010,10 +1153,10 @@ TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy)); } -TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { - auto* processor = new TestAuthMetadataProcessor(true); +TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { + auto* processor = new TestAuthMetadataProcessor(false); StartServer(std::shared_ptr(processor)); - ResetStub(false); + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; @@ -1025,29 +1168,25 @@ TEST_F(End2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); } -// Client sends 20 requests and the server returns CANCELLED status after -// reading 10 requests. -TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) { - ResetStub(false); +TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { + ResetStub(); EchoRequest request; EchoResponse response; ClientContext context; + context.set_credentials( + MetadataCredentialsFromPlugin(std::unique_ptr( + new TestMetadataCredentialsPlugin( + "Does not matter, will fail anyway (see 3rd param)", true, + false)))); + request.set_message("Hello"); - context.AddMetadata(kServerCancelAfterReads, "10"); - auto stream = stub_->RequestStream(&context, &response); - request.set_message("hello"); - int send_messages = 20; - while (send_messages > 0) { - EXPECT_TRUE(stream->Write(request)); - send_messages--; - } - stream->WritesDone(); - Status s = stream->Finish(); - EXPECT_EQ(s.error_code(), StatusCode::CANCELLED); + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); } -TEST_F(End2endTest, ClientAuthContext) { - ResetStub(false); +TEST_P(SecureEnd2endTest, ClientAuthContext) { + ResetStub(); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -1072,119 +1211,20 @@ TEST_F(End2endTest, ClientAuthContext) { EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2])); } -// Make the response larger than the flow control window. -TEST_P(End2endTest, HugeResponse) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("huge response"); - const size_t kResponseSize = 1024 * (1024 + 10); - request.mutable_param()->set_response_message_length(kResponseSize); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(kResponseSize, response.message().size()); - EXPECT_TRUE(s.ok()); -} - -namespace { -void ReaderThreadFunc(ClientReaderWriter* stream, - gpr_event* ev) { - EchoResponse resp; - gpr_event_set(ev, (void*)1); - while (stream->Read(&resp)) { - gpr_log(GPR_INFO, "Read message"); - } -} -} // namespace - -// Run a Read and a WritesDone simultaneously. -TEST_F(End2endTest, SimultaneousReadWritesDone) { - ResetStub(false); - ClientContext context; - gpr_event ev; - gpr_event_init(&ev); - auto stream = stub_->BidiStream(&context); - std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev); - gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); - stream->WritesDone(); - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); - reader_thread.join(); -} - -TEST_P(End2endTest, Peer) { - ResetStub(GetParam()); - EchoRequest request; - EchoResponse response; - request.set_message("hello"); - request.mutable_param()->set_echo_peer(true); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(CheckIsLocalhost(response.param().peer())); - EXPECT_TRUE(CheckIsLocalhost(context.peer())); -} - -TEST_F(End2endTest, ChannelState) { - ResetStub(false); - // Start IDLE - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); - - // Did not ask to connect, no state change. - CompletionQueue cq; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(10); - channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL); - void* tag; - bool ok = true; - cq.Next(&tag, &ok); - EXPECT_FALSE(ok); - - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true)); - EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE, - gpr_inf_future(GPR_CLOCK_REALTIME))); - EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false)); -} - -// Takes 10s. -TEST_F(End2endTest, ChannelStateTimeout) { - int port = grpc_pick_unused_port_or_die(); - std::ostringstream server_address; - server_address << "127.0.0.1:" << port; - // Channel to non-existing server - auto channel = CreateChannel(server_address.str(), InsecureCredentials()); - // Start IDLE - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true)); - - auto state = GRPC_CHANNEL_IDLE; - for (int i = 0; i < 10; i++) { - channel->WaitForStateChange( - state, std::chrono::system_clock::now() + std::chrono::seconds(1)); - state = channel->GetState(false); - } -} - -// Talking to a non-existing service. -TEST_F(End2endTest, NonExistingService) { - ResetChannel(); - std::unique_ptr stub; - stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel_); +INSTANTIATE_TEST_CASE_P(End2end, End2endTest, + ::testing::Values(TestScenario(false, true), + TestScenario(false, false))); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); +INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest, + ::testing::Values(TestScenario(true, true), + TestScenario(true, false), + TestScenario(false, true), + TestScenario(false, false))); - ClientContext context; - Status s = stub->Unimplemented(&context, request, &response); - EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code()); - EXPECT_EQ("", s.error_message()); -} - -INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true)); +INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest, + ::testing::Values(TestScenario(false, true))); +} // namespace } // namespace testing } // namespace grpc