/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include #include #include #include #include #include #include #include #include #include #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/end2end/interceptors_util.h" #include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/byte_buffer_proto_helper.h" #include namespace grpc { namespace testing { namespace { /* This interceptor does nothing. Just keeps a global count on the number of * times it was invoked. */ class DummyInterceptor : public experimental::Interceptor { public: DummyInterceptor(experimental::ServerRpcInfo* info) {} virtual void Intercept(experimental::InterceptorBatchMethods* methods) { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { num_times_run_++; } else if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA)) { num_times_run_reverse_++; } methods->Proceed(); } static void Reset() { num_times_run_.store(0); num_times_run_reverse_.store(0); } static int GetNumTimesRun() { EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load()); return num_times_run_.load(); } private: static std::atomic num_times_run_; static std::atomic num_times_run_reverse_; }; std::atomic DummyInterceptor::num_times_run_; std::atomic DummyInterceptor::num_times_run_reverse_; class DummyInterceptorFactory : public experimental::ServerInterceptorFactoryInterface { public: virtual experimental::Interceptor* CreateServerInterceptor( experimental::ServerRpcInfo* info) override { return new DummyInterceptor(info); } }; class LoggingInterceptor : public experimental::Interceptor { public: LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { // gpr_log(GPR_ERROR, "ran this"); if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { auto* map = methods->GetSendInitialMetadata(); // Got nothing better to do here for now EXPECT_EQ(map->size(), static_cast(0)); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { EchoRequest req; auto* buffer = methods->GetSendMessage(); auto copied_buffer = *buffer; SerializationTraits::Deserialize(&copied_buffer, &req); EXPECT_TRUE(req.message().find("Hello") == 0); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_STATUS)) { auto* map = methods->GetSendTrailingMetadata(); bool found = false; // Check that we received the metadata as an echo for (const auto& pair : *map) { found = pair.first.find("testkey") == 0 && pair.second.find("testvalue") == 0; if (found) break; } EXPECT_EQ(found, true); auto status = methods->GetSendStatus(); EXPECT_EQ(status.ok(), true); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) { auto* map = methods->GetRecvInitialMetadata(); bool found = false; // Check that we received the metadata as an echo for (const auto& pair : *map) { found = pair.first.find("testkey") == 0 && pair.second.find("testvalue") == 0; if (found) break; } EXPECT_EQ(found, true); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) { EchoResponse* resp = static_cast(methods->GetRecvMessage()); EXPECT_TRUE(resp->message().find("Hello") == 0); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE)) { // Got nothing interesting to do here } methods->Proceed(); } private: experimental::ServerRpcInfo* info_; }; class LoggingInterceptorFactory : public experimental::ServerInterceptorFactoryInterface { public: virtual experimental::Interceptor* CreateServerInterceptor( experimental::ServerRpcInfo* info) override { return new LoggingInterceptor(info); } }; void MakeBidiStreamingCall(const std::shared_ptr& channel) { auto stub = grpc::testing::EchoTestService::NewStub(channel); ClientContext ctx; EchoRequest req; EchoResponse resp; ctx.AddMetadata("testkey", "testvalue"); auto stream = stub->BidiStream(&ctx); for (auto i = 0; i < 10; i++) { req.set_message("Hello" + std::to_string(i)); stream->Write(req); stream->Read(&resp); EXPECT_EQ(req.message(), resp.message()); } ASSERT_TRUE(stream->WritesDone()); Status s = stream->Finish(); EXPECT_EQ(s.ok(), true); } class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test { protected: ServerInterceptorsEnd2endSyncUnaryTest() { int port = grpc_pick_unused_port_or_die(); ServerBuilder builder; server_address_ = "localhost:" + std::to_string(port); builder.AddListeningPort(server_address_, InsecureServerCredentials()); builder.RegisterService(&service_); std::vector< std::unique_ptr> creators; creators.push_back( std::unique_ptr( new LoggingInterceptorFactory())); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr( new DummyInterceptorFactory())); } builder.experimental().SetInterceptorCreators(std::move(creators)); server_ = builder.BuildAndStart(); } std::string server_address_; TestServiceImpl service_; std::unique_ptr server_; }; TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) { ChannelArguments args; DummyInterceptor::Reset(); auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); MakeCall(channel); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test { protected: ServerInterceptorsEnd2endSyncStreamingTest() { int port = grpc_pick_unused_port_or_die(); ServerBuilder builder; server_address_ = "localhost:" + std::to_string(port); builder.AddListeningPort(server_address_, InsecureServerCredentials()); builder.RegisterService(&service_); std::vector< std::unique_ptr> creators; creators.push_back( std::unique_ptr( new LoggingInterceptorFactory())); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr( new DummyInterceptorFactory())); } builder.experimental().SetInterceptorCreators(std::move(creators)); server_ = builder.BuildAndStart(); } std::string server_address_; EchoTestServiceStreamingImpl service_; std::unique_ptr server_; }; TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); MakeClientStreamingCall(channel); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); MakeServerStreamingCall(channel); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); MakeBidiStreamingCall(channel); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } class ServerInterceptorsAsyncEnd2endTest : public ::testing::Test {}; TEST_F(ServerInterceptorsAsyncEnd2endTest, UnaryTest) { DummyInterceptor::Reset(); int port = grpc_pick_unused_port_or_die(); string server_address = "localhost:" + std::to_string(port); ServerBuilder builder; EchoTestService::AsyncService service; builder.AddListeningPort(server_address, InsecureServerCredentials()); builder.RegisterService(&service); std::vector> creators; creators.push_back( std::unique_ptr( new LoggingInterceptorFactory())); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr( new DummyInterceptorFactory())); } builder.experimental().SetInterceptorCreators(std::move(creators)); auto cq = builder.AddCompletionQueue(); auto server = builder.BuildAndStart(); ChannelArguments args; auto channel = CreateChannel(server_address, InsecureChannelCredentials()); auto stub = grpc::testing::EchoTestService::NewStub(channel); EchoRequest send_request; EchoRequest recv_request; EchoResponse send_response; EchoResponse recv_response; Status recv_status; ClientContext cli_ctx; ServerContext srv_ctx; grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); send_request.set_message("Hello"); cli_ctx.AddMetadata("testkey", "testvalue"); std::unique_ptr> response_reader( stub->AsyncEcho(&cli_ctx, send_request, cq.get())); service.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq.get(), cq.get(), tag(2)); response_reader->Finish(&recv_response, &recv_status, tag(4)); Verifier().Expect(2, true).Verify(cq.get()); EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue")); srv_ctx.AddTrailingMetadata("testkey", "testvalue"); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); Verifier().Expect(3, true).Expect(4, true).Verify(cq.get()); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey", "testvalue")); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); server->Shutdown(); cq->Shutdown(); void* ignored_tag; bool ignored_ok; while (cq->Next(&ignored_tag, &ignored_ok)) ; grpc_recycle_unused_port(port); } TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) { DummyInterceptor::Reset(); int port = grpc_pick_unused_port_or_die(); string server_address = "localhost:" + std::to_string(port); ServerBuilder builder; EchoTestService::AsyncService service; builder.AddListeningPort(server_address, InsecureServerCredentials()); builder.RegisterService(&service); std::vector> creators; creators.push_back( std::unique_ptr( new LoggingInterceptorFactory())); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr( new DummyInterceptorFactory())); } builder.experimental().SetInterceptorCreators(std::move(creators)); auto cq = builder.AddCompletionQueue(); auto server = builder.BuildAndStart(); ChannelArguments args; auto channel = CreateChannel(server_address, InsecureChannelCredentials()); auto stub = grpc::testing::EchoTestService::NewStub(channel); EchoRequest send_request; EchoRequest recv_request; EchoResponse send_response; EchoResponse recv_response; Status recv_status; ClientContext cli_ctx; ServerContext srv_ctx; grpc::ServerAsyncReaderWriter srv_stream(&srv_ctx); send_request.set_message("Hello"); cli_ctx.AddMetadata("testkey", "testvalue"); std::unique_ptr> cli_stream(stub->AsyncBidiStream(&cli_ctx, cq.get(), tag(1))); service.RequestBidiStream(&srv_ctx, &srv_stream, cq.get(), cq.get(), tag(2)); Verifier().Expect(1, true).Expect(2, true).Verify(cq.get()); EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue")); srv_ctx.AddTrailingMetadata("testkey", "testvalue"); cli_stream->Write(send_request, tag(3)); srv_stream.Read(&recv_request, tag(4)); Verifier().Expect(3, true).Expect(4, true).Verify(cq.get()); EXPECT_EQ(send_request.message(), recv_request.message()); send_response.set_message(recv_request.message()); srv_stream.Write(send_response, tag(5)); cli_stream->Read(&recv_response, tag(6)); Verifier().Expect(5, true).Expect(6, true).Verify(cq.get()); EXPECT_EQ(send_response.message(), recv_response.message()); cli_stream->WritesDone(tag(7)); srv_stream.Read(&recv_request, tag(8)); Verifier().Expect(7, true).Expect(8, false).Verify(cq.get()); srv_stream.Finish(Status::OK, tag(9)); cli_stream->Finish(&recv_status, tag(10)); Verifier().Expect(9, true).Expect(10, true).Verify(cq.get()); EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey", "testvalue")); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); server->Shutdown(); cq->Shutdown(); void* ignored_tag; bool ignored_ok; while (cq->Next(&ignored_tag, &ignored_ok)) ; grpc_recycle_unused_port(port); } } // namespace } // namespace testing } // namespace grpc int main(int argc, char** argv) { grpc_test_init(argc, argv); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }