pull/18909/head
yang-g 6 years ago
parent 68b5260b74
commit 898fc0da1e
  1. 7
      include/grpcpp/impl/codegen/byte_buffer.h
  2. 29
      include/grpcpp/server_builder_impl.h
  3. 11
      include/grpcpp/server_impl.h
  4. 17
      src/cpp/server/external_connection_acceptor_impl.cc
  5. 12
      src/cpp/server/external_connection_acceptor_impl.h
  6. 4
      src/cpp/server/server_builder.cc
  7. 2
      src/cpp/server/server_cc.cc
  8. 8
      test/cpp/end2end/port_sharing_end2end_test.cc

@ -29,10 +29,6 @@
#include <vector>
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
} // namespace grpc_impl
namespace grpc {
class ServerInterface;
@ -55,6 +51,7 @@ template <class RequestType, class ResponseType>
class CallbackServerStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
class ExternalConnectionAcceptorImpl;
template <class R>
class DeserializeFuncType;
class GrpcByteBufferPeer;
@ -189,7 +186,7 @@ class ByteBuffer final {
friend class ProtoBufferReader;
friend class ProtoBufferWriter;
friend class internal::GrpcByteBufferPeer;
friend class ::grpc_impl::ExternalConnectionAcceptorImpl;
friend class internal::ExternalConnectionAcceptorImpl;
grpc_byte_buffer* buffer_;

@ -37,7 +37,6 @@
struct grpc_resource_quota;
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
class ResourceQuota;
class ServerCredentials;
} // namespace grpc_impl
@ -52,13 +51,17 @@ namespace testing {
class ServerBuilderPluginTest;
} // namespace testing
namespace internal {
class ExternalConnectionAcceptorImpl;
} // namespace internal
namespace experimental {
class CallbackGenericService;
}
// EXPERIMENTAL API:
// Interface for a grpc server to handle connections created out of band.
// See ServerBuilder's AddExternalConnectionAcceptor API for usage.
// Interface for a grpc server to build transports with connections created out
// of band.
// See ServerBuilder's AddExternalConnectionAcceptor API.
class ExternalConnectionAcceptor {
public:
struct NewConnectionParameters {
@ -66,11 +69,12 @@ class ExternalConnectionAcceptor {
ByteBuffer read_buffer; // data intended for the grpc server
};
virtual ~ExternalConnectionAcceptor() {}
// If called before grpc::Server is started, the new connection will be
// closed.
// If called before grpc::Server is started or after it is shut down, the new
// connection will be closed.
virtual void HandleNewConnection(NewConnectionParameters* p) = 0;
};
} // namespace experimental
} // namespace grpc
namespace grpc_impl {
@ -265,13 +269,15 @@ class ServerBuilder {
ServerBuilder& RegisterCallbackGenericService(
grpc::experimental::CallbackGenericService* service);
enum ExternalConnectionType {
enum class ExternalConnectionType {
CONNECTION_FROM_FD = 0 // in the form of a file descriptor
};
// Create an acceptor to take in external connections and pass them to the
// gRPC server.
std::unique_ptr<grpc::ExternalConnectionAcceptor>
/// Register an acceptor to handle the externally accepted connection in
/// grpc server. The returned acceptor can be used to pass the connection
/// to grpc server, where a channel will be created with the provided
/// server credentials.
std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
AddExternalConnectionAcceptor(ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds);
@ -374,7 +380,8 @@ class ServerBuilder {
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
interceptor_creators_;
std::vector<std::shared_ptr<ExternalConnectionAcceptorImpl>> acceptors_;
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors_;
};
} // namespace grpc_impl

@ -42,14 +42,16 @@
struct grpc_server;
namespace grpc {
class AsyncGenericService;
class ServerContext;
namespace internal {
class ExternalConnectionAcceptorImpl;
} // namespace internal
} // namespace grpc
namespace grpc_impl {
class ExternalConnectionAcceptorImpl;
class HealthCheckServiceInterface;
class ServerInitializer;
@ -184,7 +186,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
std::vector<
std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_resource_quota* server_rq = nullptr,
std::vector<std::unique_ptr<
@ -271,7 +274,7 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
grpc_impl::ServerInitializer* initializer();
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors_;
// A vector of interceptor factory objects.

@ -23,9 +23,10 @@
#include <grpcpp/server_builder_impl.h>
#include <grpcpp/support/channel_arguments.h>
namespace grpc_impl {
namespace grpc {
namespace internal {
namespace {
class InternalAcceptor : public grpc::ExternalConnectionAcceptor {
class InternalAcceptor : public experimental::ExternalConnectionAcceptor {
public:
explicit InternalAcceptor(
std::shared_ptr<ExternalConnectionAcceptorImpl> impl)
@ -48,17 +49,17 @@ ExternalConnectionAcceptorImpl::ExternalConnectionAcceptorImpl(
CONNECTION_FROM_FD);
}
std::unique_ptr<grpc::ExternalConnectionAcceptor>
std::unique_ptr<experimental::ExternalConnectionAcceptor>
ExternalConnectionAcceptorImpl::GetAcceptor() {
std::lock_guard<std::mutex> lock(mu_);
GPR_ASSERT(!has_acceptor_);
has_acceptor_ = true;
return std::unique_ptr<grpc::ExternalConnectionAcceptor>(
return std::unique_ptr<experimental::ExternalConnectionAcceptor>(
new InternalAcceptor(shared_from_this()));
}
void ExternalConnectionAcceptorImpl::HandleNewConnection(
grpc::ExternalConnectionAcceptor::NewConnectionParameters* p) {
experimental::ExternalConnectionAcceptor::NewConnectionParameters* p) {
std::lock_guard<std::mutex> lock(mu_);
if (shutdown_ || !started_) {
// TODO(yangg) clean up.
@ -86,9 +87,9 @@ void ExternalConnectionAcceptorImpl::Start() {
started_ = true;
}
void ExternalConnectionAcceptorImpl::SetToChannelArgs(
::grpc::ChannelArguments* args) {
void ExternalConnectionAcceptorImpl::SetToChannelArgs(ChannelArguments* args) {
args->SetPointer(name_.c_str(), &handler_);
}
} // namespace grpc_impl
} // namespace internal
} // namespace grpc

@ -30,9 +30,8 @@
#include "src/core/lib/iomgr/tcp_server.h"
namespace grpc_impl {
typedef void (*RawConnectionHandler)(int fd, grpc_byte_buffer* buffer);
namespace grpc {
namespace internal {
class ExternalConnectionAcceptorImpl
: public std::enable_shared_from_this<ExternalConnectionAcceptorImpl> {
@ -42,10 +41,10 @@ class ExternalConnectionAcceptorImpl
ServerBuilder::experimental_type::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds);
// Should only be called once.
std::unique_ptr<grpc::ExternalConnectionAcceptor> GetAcceptor();
std::unique_ptr<experimental::ExternalConnectionAcceptor> GetAcceptor();
void HandleNewConnection(
grpc::ExternalConnectionAcceptor::NewConnectionParameters* p);
experimental::ExternalConnectionAcceptor::NewConnectionParameters* p);
void Shutdown();
@ -67,6 +66,7 @@ class ExternalConnectionAcceptorImpl
bool shutdown_ = false;
};
} // namespace grpc_impl
} // namespace internal
} // namespace grpc
#endif // SRC_CPP_SERVER_EXTERNAL_CONNECTION_ACCEPTOR_IMPL_H_

@ -116,7 +116,7 @@ ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
return *builder_;
}
std::unique_ptr<grpc::ExternalConnectionAcceptor>
std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
experimental_type::ExternalConnectionType type,
std::shared_ptr<ServerCredentials> creds) {
@ -124,7 +124,7 @@ ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
char count_str[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str);
builder_->acceptors_.emplace_back(
std::make_shared<ExternalConnectionAcceptorImpl>(
std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>(
name_prefix.append(count_str), type, creds));
return builder_->acceptors_.back()->GetAcceptor();
}

@ -938,7 +938,7 @@ Server::Server(
std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec,
std::vector<std::shared_ptr<::grpc_impl::ExternalConnectionAcceptorImpl>>
std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
acceptors,
grpc_resource_quota* server_rq,
std::vector<

@ -116,7 +116,8 @@ class TestTcpServer {
const grpc::string& address() { return address_; }
void SetAcceptor(std::unique_ptr<ExternalConnectionAcceptor> acceptor) {
void SetAcceptor(
std::unique_ptr<experimental::ExternalConnectionAcceptor> acceptor) {
connection_acceptor_ = std::move(acceptor);
}
@ -164,7 +165,7 @@ class TestTcpServer {
void OnFdReleased(grpc_error* err) {
EXPECT_EQ(GRPC_ERROR_NONE, err);
ExternalConnectionAcceptor::NewConnectionParameters p;
experimental::ExternalConnectionAcceptor::NewConnectionParameters p;
p.fd = fd_;
if (queue_data_) {
char buf[1024];
@ -190,7 +191,8 @@ class TestTcpServer {
std::thread running_thread_;
int port_;
grpc::string address_;
std::unique_ptr<ExternalConnectionAcceptor> connection_acceptor_;
std::unique_ptr<experimental::ExternalConnectionAcceptor>
connection_acceptor_;
test_tcp_server tcp_server_;
};

Loading…
Cancel
Save