mirror of https://github.com/grpc/grpc.git
The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
https://grpc.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
375 lines
12 KiB
375 lines
12 KiB
/* |
|
* |
|
* Copyright 2019 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
#include <grpc/grpc.h> |
|
#include <grpc/support/alloc.h> |
|
#include <grpc/support/log.h> |
|
#include <grpc/support/time.h> |
|
#include <grpcpp/channel.h> |
|
#include <grpcpp/client_context.h> |
|
#include <grpcpp/create_channel.h> |
|
#include <grpcpp/security/credentials.h> |
|
#include <grpcpp/security/server_credentials.h> |
|
#include <grpcpp/server.h> |
|
#include <grpcpp/server_builder.h> |
|
#include <grpcpp/server_context.h> |
|
#include <gtest/gtest.h> |
|
|
|
#include <mutex> |
|
#include <thread> |
|
|
|
#include "src/core/lib/gpr/env.h" |
|
#include "src/core/lib/iomgr/endpoint.h" |
|
#include "src/core/lib/iomgr/exec_ctx.h" |
|
#include "src/core/lib/iomgr/pollset.h" |
|
#include "src/core/lib/iomgr/port.h" |
|
#include "src/core/lib/iomgr/tcp_server.h" |
|
#include "src/core/lib/security/credentials/credentials.h" |
|
#include "src/proto/grpc/testing/echo.grpc.pb.h" |
|
#include "test/core/util/port.h" |
|
#include "test/core/util/test_config.h" |
|
#include "test/core/util/test_tcp_server.h" |
|
#include "test/cpp/end2end/test_service_impl.h" |
|
#include "test/cpp/util/test_credentials_provider.h" |
|
|
|
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER |
|
|
|
#include "src/core/lib/iomgr/tcp_posix.h" |
|
|
|
namespace grpc { |
|
namespace testing { |
|
namespace { |
|
|
|
class TestScenario { |
|
public: |
|
TestScenario(bool server_port, bool pending_data, |
|
const grpc::string& creds_type) |
|
: server_has_port(server_port), |
|
queue_pending_data(pending_data), |
|
credentials_type(creds_type) {} |
|
void Log() const; |
|
// server has its own port or not |
|
bool server_has_port; |
|
// whether tcp server should read some data before handoff |
|
bool queue_pending_data; |
|
const grpc::string credentials_type; |
|
}; |
|
|
|
static std::ostream& operator<<(std::ostream& out, |
|
const TestScenario& scenario) { |
|
return out << "TestScenario{server_has_port=" |
|
<< (scenario.server_has_port ? "true" : "false") |
|
<< ", queue_pending_data=" |
|
<< (scenario.queue_pending_data ? "true" : "false") |
|
<< ", credentials='" << scenario.credentials_type << "'}"; |
|
} |
|
|
|
void TestScenario::Log() const { |
|
std::ostringstream out; |
|
out << *this; |
|
gpr_log(GPR_ERROR, "%s", out.str().c_str()); |
|
} |
|
|
|
// Set up a test tcp server which is in charge of accepting connections and |
|
// handing off the connections as fds. |
|
class TestTcpServer { |
|
public: |
|
TestTcpServer() |
|
: shutdown_(false), |
|
queue_data_(false), |
|
port_(grpc_pick_unused_port_or_die()) { |
|
std::ostringstream server_address; |
|
server_address << "localhost:" << port_; |
|
address_ = server_address.str(); |
|
test_tcp_server_init(&tcp_server_, &TestTcpServer::OnConnect, this); |
|
GRPC_CLOSURE_INIT(&on_fd_released_, &TestTcpServer::OnFdReleased, this, |
|
grpc_schedule_on_exec_ctx); |
|
} |
|
|
|
~TestTcpServer() { |
|
running_thread_.join(); |
|
test_tcp_server_destroy(&tcp_server_); |
|
grpc_recycle_unused_port(port_); |
|
} |
|
|
|
// Read some data before handing off the connection. |
|
void SetQueueData() { queue_data_ = true; } |
|
|
|
void Start() { |
|
test_tcp_server_start(&tcp_server_, port_); |
|
gpr_log(GPR_INFO, "Test TCP server started at %s", address_.c_str()); |
|
} |
|
|
|
const grpc::string& address() { return address_; } |
|
|
|
void SetAcceptor( |
|
std::unique_ptr<experimental::ExternalConnectionAcceptor> acceptor) { |
|
connection_acceptor_ = std::move(acceptor); |
|
} |
|
|
|
void Run() { |
|
running_thread_ = std::thread([this]() { |
|
while (true) { |
|
{ |
|
std::lock_guard<std::mutex> lock(mu_); |
|
if (shutdown_) { |
|
return; |
|
} |
|
} |
|
test_tcp_server_poll(&tcp_server_, 1); |
|
} |
|
}); |
|
} |
|
|
|
void Shutdown() { |
|
std::lock_guard<std::mutex> lock(mu_); |
|
shutdown_ = true; |
|
} |
|
|
|
static void OnConnect(void* arg, grpc_endpoint* tcp, |
|
grpc_pollset* accepting_pollset, |
|
grpc_tcp_server_acceptor* acceptor) { |
|
auto* self = static_cast<TestTcpServer*>(arg); |
|
self->OnConnect(tcp, accepting_pollset, acceptor); |
|
} |
|
|
|
static void OnFdReleased(void* arg, grpc_error* err) { |
|
auto* self = static_cast<TestTcpServer*>(arg); |
|
self->OnFdReleased(err); |
|
} |
|
|
|
private: |
|
void OnConnect(grpc_endpoint* tcp, grpc_pollset* /*accepting_pollset*/, |
|
grpc_tcp_server_acceptor* acceptor) { |
|
char* peer = grpc_endpoint_get_peer(tcp); |
|
gpr_log(GPR_INFO, "Got incoming connection! from %s", peer); |
|
gpr_free(peer); |
|
EXPECT_FALSE(acceptor->external_connection); |
|
listener_fd_ = grpc_tcp_server_port_fd( |
|
acceptor->from_server, acceptor->port_index, acceptor->fd_index); |
|
gpr_free(acceptor); |
|
grpc_tcp_destroy_and_release_fd(tcp, &fd_, &on_fd_released_); |
|
} |
|
|
|
void OnFdReleased(grpc_error* err) { |
|
EXPECT_EQ(GRPC_ERROR_NONE, err); |
|
experimental::ExternalConnectionAcceptor::NewConnectionParameters p; |
|
p.listener_fd = listener_fd_; |
|
p.fd = fd_; |
|
if (queue_data_) { |
|
char buf[1024]; |
|
ssize_t read_bytes = 0; |
|
while (read_bytes <= 0) { |
|
read_bytes = read(fd_, buf, 1024); |
|
} |
|
Slice data(buf, read_bytes); |
|
p.read_buffer = ByteBuffer(&data, 1); |
|
} |
|
gpr_log(GPR_INFO, "Handing off fd %d with data size %d from listener fd %d", |
|
fd_, static_cast<int>(p.read_buffer.Length()), listener_fd_); |
|
connection_acceptor_->HandleNewConnection(&p); |
|
} |
|
|
|
std::mutex mu_; |
|
bool shutdown_; |
|
|
|
int listener_fd_ = -1; |
|
int fd_ = -1; |
|
bool queue_data_ = false; |
|
|
|
grpc_closure on_fd_released_; |
|
std::thread running_thread_; |
|
int port_ = -1; |
|
grpc::string address_; |
|
std::unique_ptr<experimental::ExternalConnectionAcceptor> |
|
connection_acceptor_; |
|
test_tcp_server tcp_server_; |
|
}; |
|
|
|
class PortSharingEnd2endTest : public ::testing::TestWithParam<TestScenario> { |
|
protected: |
|
PortSharingEnd2endTest() : is_server_started_(false), first_picked_port_(0) { |
|
GetParam().Log(); |
|
} |
|
|
|
void SetUp() override { |
|
if (GetParam().queue_pending_data) { |
|
tcp_server1_.SetQueueData(); |
|
tcp_server2_.SetQueueData(); |
|
} |
|
tcp_server1_.Start(); |
|
tcp_server2_.Start(); |
|
ServerBuilder builder; |
|
if (GetParam().server_has_port) { |
|
int port = grpc_pick_unused_port_or_die(); |
|
first_picked_port_ = port; |
|
server_address_ << "localhost:" << port; |
|
auto creds = GetCredentialsProvider()->GetServerCredentials( |
|
GetParam().credentials_type); |
|
builder.AddListeningPort(server_address_.str(), creds); |
|
gpr_log(GPR_INFO, "gRPC server listening on %s", |
|
server_address_.str().c_str()); |
|
} |
|
auto server_creds = GetCredentialsProvider()->GetServerCredentials( |
|
GetParam().credentials_type); |
|
auto acceptor1 = builder.experimental().AddExternalConnectionAcceptor( |
|
ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD, |
|
server_creds); |
|
tcp_server1_.SetAcceptor(std::move(acceptor1)); |
|
auto acceptor2 = builder.experimental().AddExternalConnectionAcceptor( |
|
ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD, |
|
server_creds); |
|
tcp_server2_.SetAcceptor(std::move(acceptor2)); |
|
builder.RegisterService(&service_); |
|
server_ = builder.BuildAndStart(); |
|
is_server_started_ = true; |
|
|
|
tcp_server1_.Run(); |
|
tcp_server2_.Run(); |
|
} |
|
|
|
void TearDown() override { |
|
tcp_server1_.Shutdown(); |
|
tcp_server2_.Shutdown(); |
|
if (is_server_started_) { |
|
server_->Shutdown(); |
|
} |
|
if (first_picked_port_ > 0) { |
|
grpc_recycle_unused_port(first_picked_port_); |
|
} |
|
} |
|
|
|
void ResetStubs() { |
|
EXPECT_TRUE(is_server_started_); |
|
ChannelArguments args; |
|
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1); |
|
auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( |
|
GetParam().credentials_type, &args); |
|
channel_handoff1_ = |
|
CreateCustomChannel(tcp_server1_.address(), channel_creds, args); |
|
stub_handoff1_ = EchoTestService::NewStub(channel_handoff1_); |
|
channel_handoff2_ = |
|
CreateCustomChannel(tcp_server2_.address(), channel_creds, args); |
|
stub_handoff2_ = EchoTestService::NewStub(channel_handoff2_); |
|
if (GetParam().server_has_port) { |
|
ChannelArguments direct_args; |
|
direct_args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1); |
|
auto direct_creds = GetCredentialsProvider()->GetChannelCredentials( |
|
GetParam().credentials_type, &direct_args); |
|
channel_direct_ = |
|
CreateCustomChannel(server_address_.str(), direct_creds, direct_args); |
|
stub_direct_ = EchoTestService::NewStub(channel_direct_); |
|
} |
|
} |
|
|
|
bool is_server_started_; |
|
// channel/stub to the test tcp server, the connection will be handed to the |
|
// grpc server. |
|
std::shared_ptr<Channel> channel_handoff1_; |
|
std::unique_ptr<EchoTestService::Stub> stub_handoff1_; |
|
std::shared_ptr<Channel> channel_handoff2_; |
|
std::unique_ptr<EchoTestService::Stub> stub_handoff2_; |
|
// channel/stub to talk to the grpc server directly, if applicable. |
|
std::shared_ptr<Channel> channel_direct_; |
|
std::unique_ptr<EchoTestService::Stub> stub_direct_; |
|
std::unique_ptr<Server> server_; |
|
std::ostringstream server_address_; |
|
TestServiceImpl service_; |
|
TestTcpServer tcp_server1_; |
|
TestTcpServer tcp_server2_; |
|
int first_picked_port_; |
|
}; |
|
|
|
static void SendRpc(EchoTestService::Stub* stub, int num_rpcs) { |
|
EchoRequest request; |
|
EchoResponse response; |
|
request.set_message("Hello hello hello hello"); |
|
|
|
for (int i = 0; i < num_rpcs; ++i) { |
|
ClientContext context; |
|
Status s = stub->Echo(&context, request, &response); |
|
EXPECT_EQ(response.message(), request.message()); |
|
EXPECT_TRUE(s.ok()); |
|
} |
|
} |
|
|
|
std::vector<TestScenario> CreateTestScenarios() { |
|
std::vector<TestScenario> scenarios; |
|
std::vector<grpc::string> credentials_types; |
|
|
|
#if TARGET_OS_IPHONE |
|
// Workaround Apple CFStream bug |
|
gpr_setenv("grpc_cfstream", "0"); |
|
#endif |
|
|
|
credentials_types = GetCredentialsProvider()->GetSecureCredentialsTypeList(); |
|
// Only allow insecure credentials type when it is registered with the |
|
// provider. User may create providers that do not have insecure. |
|
if (GetCredentialsProvider()->GetChannelCredentials(kInsecureCredentialsType, |
|
nullptr) != nullptr) { |
|
credentials_types.push_back(kInsecureCredentialsType); |
|
} |
|
|
|
GPR_ASSERT(!credentials_types.empty()); |
|
for (const auto& cred : credentials_types) { |
|
for (auto server_has_port : {true, false}) { |
|
for (auto queue_pending_data : {true, false}) { |
|
scenarios.emplace_back(server_has_port, queue_pending_data, cred); |
|
} |
|
} |
|
} |
|
return scenarios; |
|
} |
|
|
|
TEST_P(PortSharingEnd2endTest, HandoffAndDirectCalls) { |
|
ResetStubs(); |
|
SendRpc(stub_handoff1_.get(), 5); |
|
if (GetParam().server_has_port) { |
|
SendRpc(stub_direct_.get(), 5); |
|
} |
|
} |
|
|
|
TEST_P(PortSharingEnd2endTest, MultipleHandoff) { |
|
for (int i = 0; i < 3; i++) { |
|
ResetStubs(); |
|
SendRpc(stub_handoff2_.get(), 1); |
|
} |
|
} |
|
|
|
TEST_P(PortSharingEnd2endTest, TwoHandoffPorts) { |
|
for (int i = 0; i < 3; i++) { |
|
ResetStubs(); |
|
SendRpc(stub_handoff1_.get(), 5); |
|
SendRpc(stub_handoff2_.get(), 5); |
|
} |
|
} |
|
|
|
INSTANTIATE_TEST_SUITE_P(PortSharingEnd2end, PortSharingEnd2endTest, |
|
::testing::ValuesIn(CreateTestScenarios())); |
|
|
|
} // namespace |
|
} // namespace testing |
|
} // namespace grpc |
|
|
|
#endif // GRPC_POSIX_SOCKET_TCP_SERVER |
|
|
|
int main(int argc, char** argv) { |
|
grpc::testing::TestEnvironment env(argc, argv); |
|
::testing::InitGoogleTest(&argc, argv); |
|
return RUN_ALL_TESTS(); |
|
}
|
|
|