From 02ca015b1328f1ab473eefb5b72f1314ddaef959 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 21 Oct 2020 01:29:57 -0700 Subject: [PATCH] Experimental support and tests for CreateCustomInsecureChannelWithInterceptorsFromFd --- include/grpcpp/create_channel_posix.h | 4 +- .../client_interceptors_end2end_test.cc | 100 ++++++++++++++---- 2 files changed, 83 insertions(+), 21 deletions(-) diff --git a/include/grpcpp/create_channel_posix.h b/include/grpcpp/create_channel_posix.h index e8c82b37d24..8c34330d976 100644 --- a/include/grpcpp/create_channel_posix.h +++ b/include/grpcpp/create_channel_posix.h @@ -57,8 +57,8 @@ namespace experimental { std::shared_ptr CreateCustomInsecureChannelWithInterceptorsFromFd( const std::string& target, int fd, const grpc::ChannelArguments& args, - std::unique_ptr>> + std::vector< + std::unique_ptr> interceptor_creators); } // namespace experimental diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 56839c7eafe..8bc81bfb361 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -19,17 +19,21 @@ #include #include +#include "absl/memory/memory.h" + #include #include #include +#include #include #include #include #include #include +#include #include -#include "absl/memory/memory.h" +#include "src/core/lib/iomgr/port.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -38,6 +42,11 @@ #include "test/cpp/util/byte_buffer_proto_helper.h" #include "test/cpp/util/string_ref_helper.h" +#ifdef GRPC_POSIX_SOCKET +#include +#include "src/core/lib/iomgr/socket_utils_posix.h" +#endif /* GRPC_POSIX_SOCKET */ + #include namespace grpc { @@ -55,6 +64,11 @@ enum class RPCType { kAsyncCQBidiStreaming, }; +enum class ChannelType { + kHttpChannel, + kFdChannel, +}; + /* Hijacks Echo RPC and fills in the expected values */ class HijackingInterceptor : public experimental::Interceptor { public: @@ -686,22 +700,35 @@ class LoggingInterceptorFactory class TestScenario { public: - explicit TestScenario(const RPCType& type) : type_(type) {} + explicit TestScenario(const ChannelType& channel_type, + const RPCType& rpc_type) + : channel_type_(channel_type), rpc_type_(rpc_type) {} - RPCType type() const { return type_; } + ChannelType channel_type() const { return channel_type_; } + + RPCType rpc_type() const { return rpc_type_; } private: - RPCType type_; + const ChannelType channel_type_; + const RPCType rpc_type_; }; std::vector CreateTestScenarios() { std::vector scenarios; - scenarios.emplace_back(RPCType::kSyncUnary); - scenarios.emplace_back(RPCType::kSyncClientStreaming); - scenarios.emplace_back(RPCType::kSyncServerStreaming); - scenarios.emplace_back(RPCType::kSyncBidiStreaming); - scenarios.emplace_back(RPCType::kAsyncCQUnary); - scenarios.emplace_back(RPCType::kAsyncCQServerStreaming); + std::vector rpc_types; + rpc_types.emplace_back(RPCType::kSyncUnary); + rpc_types.emplace_back(RPCType::kSyncClientStreaming); + rpc_types.emplace_back(RPCType::kSyncServerStreaming); + rpc_types.emplace_back(RPCType::kSyncBidiStreaming); + rpc_types.emplace_back(RPCType::kAsyncCQUnary); + rpc_types.emplace_back(RPCType::kAsyncCQServerStreaming); + for (const auto& rpc_type : rpc_types) { + scenarios.emplace_back(ChannelType::kHttpChannel, rpc_type); +// TODO(yashykt): Maybe add support for non-posix sockets too +#ifdef GRPC_POSIX_SOCKET + scenarios.emplace_back(ChannelType::kFdChannel, rpc_type); +#endif /* GRPC_POSIX_SOCKET */ + } return scenarios; } @@ -709,21 +736,56 @@ class ParameterizedClientInterceptorsEnd2endTest : public ::testing::TestWithParam { protected: ParameterizedClientInterceptorsEnd2endTest() { - int port = grpc_pick_unused_port_or_die(); - ServerBuilder builder; - server_address_ = "localhost:" + std::to_string(port); - builder.AddListeningPort(server_address_, InsecureServerCredentials()); builder.RegisterService(&service_); - server_ = builder.BuildAndStart(); + if (GetParam().channel_type() == ChannelType::kHttpChannel) { + int port = grpc_pick_unused_port_or_die(); + server_address_ = "localhost:" + std::to_string(port); + builder.AddListeningPort(server_address_, InsecureServerCredentials()); + server_ = builder.BuildAndStart(); + } +#ifdef GRPC_POSIX_SOCKET + else if (GetParam().channel_type() == ChannelType::kFdChannel) { + int flags; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv_) == 0); + flags = fcntl(sv_[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv_[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv_[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv_[1], F_SETFL, flags | O_NONBLOCK) == 0); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv_[0]) == + GRPC_ERROR_NONE); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv_[1]) == + GRPC_ERROR_NONE); + server_ = builder.BuildAndStart(); + AddInsecureChannelFromFd(server_.get(), sv_[1]); + } +#endif /* GRPC_POSIX_SOCKET */ } ~ParameterizedClientInterceptorsEnd2endTest() override { server_->Shutdown(); } + std::shared_ptr CreateClientChannel( + std::vector> + creators) { + if (GetParam().channel_type() == ChannelType::kHttpChannel) { + return experimental::CreateCustomChannelWithInterceptors( + server_address_, InsecureChannelCredentials(), ChannelArguments(), + std::move(creators)); + } +#ifdef GRPC_POSIX_SOCKET + else if (GetParam().channel_type() == ChannelType::kFdChannel) { + return experimental::CreateCustomInsecureChannelWithInterceptorsFromFd( + "", sv_[0], ChannelArguments(), std::move(creators)); + } +#endif /* GRPC_POSIX_SOCKET */ + return nullptr; + } + void SendRPC(const std::shared_ptr& channel) { - switch (GetParam().type()) { + switch (GetParam().rpc_type()) { case RPCType::kSyncUnary: MakeCall(channel); break; @@ -752,6 +814,7 @@ class ParameterizedClientInterceptorsEnd2endTest } std::string server_address_; + int sv_[2]; EchoTestServiceStreamingImpl service_; std::unique_ptr server_; }; @@ -767,10 +830,9 @@ TEST_P(ParameterizedClientInterceptorsEnd2endTest, for (auto i = 0; i < 20; i++) { creators.push_back(absl::make_unique()); } - auto channel = experimental::CreateCustomChannelWithInterceptors( - server_address_, InsecureChannelCredentials(), args, std::move(creators)); + auto channel = CreateClientChannel(std::move(creators)); SendRPC(channel); - LoggingInterceptor::VerifyCall(GetParam().type()); + LoggingInterceptor::VerifyCall(GetParam().rpc_type()); // Make sure all 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); }