Add listner fd as part of the external connection parameters

pull/19216/head
yang-g 6 years ago
parent b8759c2af4
commit c156653997
  1. 1
      include/grpcpp/server_builder_impl.h
  2. 4
      src/core/lib/iomgr/tcp_server.h
  3. 3
      src/core/lib/iomgr/tcp_server_posix.cc
  4. 2
      src/cpp/server/external_connection_acceptor_impl.cc
  5. 8
      test/cpp/end2end/port_sharing_end2end_test.cc

@ -67,6 +67,7 @@ class CallbackGenericService;
class ExternalConnectionAcceptor {
public:
struct NewConnectionParameters {
int listener_fd = -1;
int fd = -1;
ByteBuffer read_buffer; // data intended for the grpc server
};

@ -41,6 +41,7 @@ typedef struct grpc_tcp_server_acceptor {
unsigned fd_index;
/* Data when the connection is passed to tcp_server from external. */
bool external_connection;
int listener_fd;
grpc_byte_buffer* pending_data;
} grpc_tcp_server_acceptor;
@ -55,7 +56,8 @@ namespace grpc_core {
class TcpServerFdHandler {
public:
virtual ~TcpServerFdHandler() = default;
virtual void Handle(int fd, grpc_byte_buffer* pending_read) GRPC_ABSTRACT;
virtual void Handle(int listener_fd, int fd,
grpc_byte_buffer* pending_read) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS;
};

@ -572,7 +572,7 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
// TODO(yangg) resolve duplicate code with on_read
void Handle(int fd, grpc_byte_buffer* buf) override {
void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
grpc_pollset* read_notifier_pollset;
grpc_resolved_address addr;
char* addr_str;
@ -606,6 +606,7 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
acceptor->port_index = -1;
acceptor->fd_index = -1;
acceptor->external_connection = true;
acceptor->listener_fd = listener_fd;
acceptor->pending_data = buf;
s_->on_accept_cb(s_->on_accept_cb_arg,
grpc_tcp_create(fdobj, s_->channel_args, addr_str),

@ -71,7 +71,7 @@ void ExternalConnectionAcceptorImpl::HandleNewConnection(
return;
}
if (handler_) {
handler_->Handle(p->fd, p->read_buffer.c_buffer());
handler_->Handle(p->listener_fd, p->fd, p->read_buffer.c_buffer());
}
}

@ -159,6 +159,8 @@ class TestTcpServer {
gpr_log(GPR_INFO, "Got incoming connection! from %s", peer);
gpr_free(peer);
EXPECT_FALSE(acceptor->external_connection);
listener_fd_ = grpc_tcp_server_port_fd(
acceptor->from_server, acceptor->port_index, acceptor->fd_index);
gpr_free(acceptor);
grpc_tcp_destroy_and_release_fd(tcp, &fd_, &on_fd_released_);
}
@ -166,6 +168,7 @@ class TestTcpServer {
void OnFdReleased(grpc_error* err) {
EXPECT_EQ(GRPC_ERROR_NONE, err);
experimental::ExternalConnectionAcceptor::NewConnectionParameters p;
p.listener_fd = listener_fd_;
p.fd = fd_;
if (queue_data_) {
char buf[1024];
@ -176,14 +179,15 @@ class TestTcpServer {
Slice data(buf, read_bytes);
p.read_buffer = ByteBuffer(&data, 1);
}
gpr_log(GPR_INFO, "Handing off fd %d with data size %d", fd_,
static_cast<int>(p.read_buffer.Length()));
gpr_log(GPR_INFO, "Handing off fd %d with data size %d from listener fd %d",
fd_, static_cast<int>(p.read_buffer.Length()), listener_fd_);
connection_acceptor_->HandleNewConnection(&p);
}
std::mutex mu_;
bool shutdown_;
int listener_fd_;
int fd_;
bool queue_data_;

Loading…
Cancel
Save