Refactors grpc udp_server_listener to be object oriented. Also adds a mutex to each listener. Instead of sharing the mutex in udp_server for all listeners, this per-listener mutex can make most of the call to different listeners in parallel.

pull/14544/head
Dan Zhang 7 years ago
parent 0fc97adc9e
commit b293e9e822
  1. 482
      src/core/lib/iomgr/udp_server.cc
  2. 61
      src/core/lib/iomgr/udp_server.h
  3. 108
      test/core/iomgr/udp_server_test.cc

@ -52,6 +52,8 @@
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
@ -62,41 +64,104 @@
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
/* one listening port */
typedef struct grpc_udp_listener grpc_udp_listener;
struct grpc_udp_listener {
int fd;
grpc_fd* emfd;
grpc_udp_server* server;
grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure write_closure;
/* A listener which implements basic features of Listening on a port for
* I/O events*/
class GrpcUdpListener {
public:
GrpcUdpListener(grpc_udp_server* server, int fd,
const grpc_resolved_address* addr);
~GrpcUdpListener();
/* Called when grpc server starts to listening on the grpc_fd. */
void StartListening(grpc_pollset** pollsets, size_t pollset_count,
GrpcUdpHandlerFactory* handler_factory);
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
void OnRead(grpc_error* error, void* do_read_arg);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
void OnCanWrite(grpc_error* error, void* do_write_arg);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
void OnFdAboutToOrphan();
/* Called to orphan fd of this listener.*/
void OrphanFd();
/* Called when this listener is going to be destroyed. */
void OnDestroy();
int fd() const { return fd_; }
protected:
grpc_fd* emfd() const { return emfd_; }
gpr_mu* mutex() { return &mutex_; }
private:
/* event manager callback when reads are ready */
static void on_read(void* arg, grpc_error* error);
static void on_write(void* arg, grpc_error* error);
static void do_read(void* arg, grpc_error* error);
static void do_write(void* arg, grpc_error* error);
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback
// interface.
static void fd_notify_on_write_wrapper(void* arg, grpc_error* error);
static void shutdown_fd(void* args, grpc_error* error);
int fd_;
grpc_fd* emfd_;
grpc_udp_server* server_;
grpc_resolved_address addr_;
grpc_closure read_closure_;
grpc_closure write_closure_;
// To be called when corresponding QuicGrpcServer closes all active
// connections.
grpc_closure orphan_fd_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
grpc_udp_server_start_cb start_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
grpc_closure notify_on_write_closure;
grpc_closure orphan_fd_closure_;
grpc_closure destroyed_closure_;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure_;
grpc_closure do_write_closure_;
grpc_closure notify_on_write_closure_;
// True if orphan_cb is trigered.
bool orphan_notified;
bool orphan_notified_;
// True if grpc_fd_notify_on_write() is called after on_write() call.
bool notify_on_write_armed;
bool notify_on_write_armed_;
// True if fd has been shutdown.
bool already_shutdown;
struct grpc_udp_listener* next;
bool already_shutdown_;
// Object actually handles I/O events. Assigned in StartListening().
GrpcUdpHandler* udp_handler_ = nullptr;
// To be notified on destruction.
GrpcUdpHandlerFactory* handler_factory_ = nullptr;
// Required to access above fields.
gpr_mu mutex_;
};
struct shutdown_fd_args {
grpc_udp_listener* sp;
gpr_mu* server_mu;
};
GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
const grpc_resolved_address* addr)
: fd_(fd),
server_(server),
orphan_notified_(false),
already_shutdown_(false) {
char* addr_str;
char* name;
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
emfd_ = grpc_fd_create(fd, name);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);
gpr_mu_init(&mutex_);
}
GrpcUdpListener::~GrpcUdpListener() {
gpr_mu_destroy(&mutex_);
}
/* the overall server */
struct grpc_udp_server {
@ -113,10 +178,11 @@ struct grpc_udp_server {
/* is this server shutting down? (boolean) */
int shutdown;
/* linked list of server ports */
grpc_udp_listener* head;
grpc_udp_listener* tail;
unsigned nports;
/* An array of listeners */
grpc_core::InlinedVector<GrpcUdpListener, 16> listeners;
/* factory for use to create udp listeners */
GrpcUdpHandlerFactory* handler_factory;
/* shutdown callback */
grpc_closure* shutdown_complete;
@ -141,8 +207,7 @@ static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) {
}
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
grpc_udp_server* s =
static_cast<grpc_udp_server*>(gpr_malloc(sizeof(grpc_udp_server)));
grpc_udp_server* s = grpc_core::New<grpc_udp_server>();
gpr_mu_init(&s->mu);
s->socket_factory = get_socket_factory(args);
if (s->socket_factory) {
@ -151,34 +216,29 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
s->head = nullptr;
s->tail = nullptr;
s->nports = 0;
return s;
}
static void shutdown_fd(void* args, grpc_error* error) {
struct shutdown_fd_args* shutdown_args =
static_cast<struct shutdown_fd_args*>(args);
grpc_udp_listener* sp = shutdown_args->sp;
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
gpr_mu_lock(shutdown_args->server_mu);
grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error));
sp->already_shutdown = true;
if (!sp->notify_on_write_armed) {
// static
void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) {
if (args == nullptr) {
// No-op if shutdown args are null.
return;
}
auto sp = static_cast<GrpcUdpListener*>(args);
gpr_mu_lock(sp->mutex());
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_);
grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error));
sp->already_shutdown_ = true;
if (!sp->notify_on_write_armed_) {
// Re-arm write notification to notify listener with error. This is
// necessary to decrement active_ports.
sp->notify_on_write_armed = true;
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
sp->notify_on_write_armed_ = true;
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
}
gpr_mu_unlock(shutdown_args->server_mu);
gpr_free(shutdown_args);
gpr_mu_unlock(sp->mutex());
}
static void dummy_cb(void* arg, grpc_error* error) {
// No-op.
}
static void finish_shutdown(grpc_udp_server* s) {
if (s->shutdown_complete != nullptr) {
@ -188,24 +248,23 @@ static void finish_shutdown(grpc_udp_server* s) {
gpr_mu_destroy(&s->mu);
gpr_log(GPR_DEBUG, "Destroy all listeners.");
while (s->head) {
grpc_udp_listener* sp = s->head;
s->head = sp->next;
gpr_free(sp);
for(size_t i = 0; i < s->listeners.size(); ++i) {
s->listeners[i].OnDestroy();
}
if (s->socket_factory) {
grpc_socket_factory_unref(s->socket_factory);
}
gpr_free(s);
grpc_core::Delete(s);
}
static void destroyed_port(void* server, grpc_error* error) {
grpc_udp_server* s = static_cast<grpc_udp_server*>(server);
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
if (s->destroyed_ports == s->listeners.size()) {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
} else {
@ -222,35 +281,30 @@ static void deactivated_all_ports(grpc_udp_server* s) {
GPR_ASSERT(s->shutdown);
if (s->head) {
grpc_udp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
if (!sp->orphan_notified) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
}
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
false /* already_closed */, "udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
if (s->listeners.size() == 0) {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
return;
}
for (size_t i = 0; i < s->listeners.size(); ++i) {
s->listeners[i].OrphanFd();
}
gpr_mu_unlock(&s->mu);
}
void GrpcUdpListener::OrphanFd() {
gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_);
grpc_unlink_if_unix_domain_socket(&addr_);
GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_,
grpc_schedule_on_exec_ctx);
/* Because at this point, all listening sockets have been shutdown already, no
* need to call OnFdAboutToOrphan() to notify the handler again. */
grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr,
false /* already_closed */, "udp_listener_shutdown");
}
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
grpc_udp_listener* sp;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@ -261,16 +315,9 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
gpr_log(GPR_DEBUG, "start to destroy udp_server");
/* shutdown all fd's */
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
struct shutdown_fd_args* args =
static_cast<struct shutdown_fd_args*>(gpr_malloc(sizeof(*args)));
args->sp = sp;
args->server_mu = &s->mu;
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
sp->orphan_notified = true;
for (size_t i = 0; i < s->listeners.size(); ++i) {
GrpcUdpListener* sp = &s->listeners[i];
sp->OnFdAboutToOrphan();
}
gpr_mu_unlock(&s->mu);
} else {
@ -279,6 +326,24 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
}
}
void GrpcUdpListener::OnFdAboutToOrphan() {
gpr_mu_lock(&mutex_);
grpc_unlink_if_unix_domain_socket(&addr_);
GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_,
grpc_schedule_on_exec_ctx);
if (!orphan_notified_ && udp_handler_ != nullptr) {
/* Singals udp_handler that the FD is about to be closed and
* should no longer be used. */
GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this,
grpc_schedule_on_exec_ctx);
gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_);
udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data);
orphan_notified_ = true;
}
gpr_mu_unlock(&mutex_);
}
static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,
const grpc_resolved_address* addr) {
return (socket_factory != nullptr)
@ -364,163 +429,140 @@ error:
return -1;
}
static void do_read(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
// static
void GrpcUdpListener::do_read(void* arg, grpc_error* error) {
GrpcUdpListener* sp = reinterpret_cast<GrpcUdpListener*>(arg);
GPR_ASSERT(error == GRPC_ERROR_NONE);
/* TODO: the reason we hold server->mu here is merely to prevent fd
* shutdown while we are reading. However, it blocks do_write(). Switch to
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
gpr_mu_lock(sp->mutex());
/* Tell the registered callback that data is available to read. */
if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
if (!sp->already_shutdown_ && sp->udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
* notification with shutdown error. */
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_);
}
gpr_mu_unlock(&sp->server->mu);
gpr_mu_unlock(sp->mutex());
}
/* event manager callback when reads are ready */
static void on_read(void* arg, grpc_error* error) {
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg);
// static
void GrpcUdpListener::on_read(void* arg, grpc_error* error) {
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
sp->OnRead(error, arg);
}
gpr_mu_lock(&sp->server->mu);
void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) {
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(sp->server);
gpr_mu_lock(&server_->mu);
if (0 == --server_->active_ports && server_->shutdown) {
gpr_mu_unlock(&server_->mu);
deactivated_all_ports(server_);
} else {
gpr_mu_unlock(&sp->server->mu);
gpr_mu_unlock(&server_->mu);
}
return;
}
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
if (sp->read_cb(sp->emfd)) {
if (udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
* notification with shutdown error. */
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
grpc_fd_notify_on_read(emfd_, &read_closure_);
}
gpr_mu_unlock(&sp->server->mu);
}
// static
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
gpr_mu_lock(&sp->server->mu);
if (!sp->notify_on_write_armed) {
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
sp->notify_on_write_armed = true;
}
gpr_mu_unlock(&sp->server->mu);
void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
GrpcUdpListener* sp = reinterpret_cast<GrpcUdpListener*>(arg);
gpr_mu_lock(sp->mutex());
if (!sp->notify_on_write_armed_) {
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
sp->notify_on_write_armed_ = true;
}
gpr_mu_unlock(sp->mutex());
}
static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
gpr_mu_lock(&sp->server->mu);
if (sp->already_shutdown) {
// static
void GrpcUdpListener::do_write(void* arg, grpc_error* error) {
GrpcUdpListener* sp = reinterpret_cast<GrpcUdpListener*>(arg);
gpr_mu_lock(sp->mutex());
if (sp->already_shutdown_) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
} else {
sp->notify_on_write_armed = false;
sp->notify_on_write_armed_ = false;
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
GPR_ASSERT(error == GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper,
arg, grpc_schedule_on_exec_ctx);
sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure);
sp->udp_handler_->OnCanWrite(sp->server_->user_data,
&sp->notify_on_write_closure_);
}
gpr_mu_unlock(&sp->server->mu);
gpr_mu_unlock(sp->mutex());
}
static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg);
// static
void GrpcUdpListener::on_write(void* arg, grpc_error* error) {
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
sp->OnCanWrite(error, arg);
}
gpr_mu_lock(&sp->server->mu);
void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) {
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(sp->server);
gpr_mu_lock(&server_->mu);
if (0 == --server_->active_ports && server_->shutdown) {
gpr_mu_unlock(&server_->mu);
deactivated_all_ports(server_);
} else {
gpr_mu_unlock(&sp->server->mu);
gpr_mu_unlock(&server_->mu);
}
return;
}
/* Schedule actual write in another thread. */
GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&sp->server->mu);
GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE);
}
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
grpc_udp_listener* sp;
int port;
char* addr_str;
char* name;
int rcv_buf_size, int snd_buf_size) {
gpr_log(GPR_DEBUG, "add socket %d to server", fd);
port =
int port =
prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size);
if (port >= 0) {
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
gpr_mu_lock(&s->mu);
s->nports++;
sp = static_cast<grpc_udp_listener*>(gpr_malloc(sizeof(grpc_udp_listener)));
sp->next = nullptr;
if (s->head == nullptr) {
s->head = sp;
} else {
s->tail->next = sp;
}
s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
sp->start_cb = start_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
s->listeners.emplace_back(s, fd, addr);
gpr_log(GPR_DEBUG,
"add socket %d to server for port %d, %zu listener(s) in total", fd,
port, s->listeners.size());
gpr_mu_unlock(&s->mu);
gpr_free(name);
}
return port;
}
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
grpc_udp_listener* sp;
GrpcUdpHandlerFactory* handler_factory) {
int allocated_port1 = -1;
int allocated_port2 = -1;
int fd;
@ -536,10 +578,10 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
for (size_t i = 0; i < s->listeners.size(); ++i) {
sockname_temp.len = sizeof(struct sockaddr_storage);
if (0 ==
getsockname(sp->fd,
getsockname(s->listeners[i].fd(),
reinterpret_cast<struct sockaddr*>(sockname_temp.addr),
reinterpret_cast<socklen_t*>(&sockname_temp.len))) {
port = grpc_sockaddr_get_port(&sockname_temp);
@ -559,6 +601,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr6_v4mapped;
}
s->handler_factory = handler_factory;
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if (grpc_sockaddr_is_wildcard(addr, &port)) {
grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
@ -569,8 +612,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
allocated_port1 =
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb,
read_cb, write_cb, orphan_cb);
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@ -593,8 +635,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb,
read_cb, write_cb, orphan_cb);
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
done:
gpr_free(allocated_addr);
@ -602,52 +643,55 @@ done:
}
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
grpc_udp_listener* sp;
if (port_index >= s->nports) {
if (port_index >= s->listeners.size()) {
return -1;
}
for (sp = s->head; sp && port_index != 0; sp = sp->next) {
--port_index;
}
GPR_ASSERT(sp); // if this fails, our check earlier was bogus
return sp->fd;
return s->listeners[port_index].fd();
}
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
gpr_log(GPR_DEBUG, "grpc_udp_server_start");
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
s->user_data = user_data;
sp = s->head;
while (sp != nullptr) {
sp->start_cb(sp->emfd, sp->server->user_data);
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
for(size_t i = 0; i < s->listeners.size(); ++i) {
s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory);
}
GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
sp->notify_on_write_armed = true;
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
gpr_mu_unlock(&s->mu);
}
/* Registered for both read and write callbacks: increment active_ports
* twice to account for this, and delay free-ing of memory until both
* on_read and on_write have fired. */
s->active_ports += 2;
void GrpcUdpListener::StartListening(grpc_pollset** pollsets,
size_t pollset_count,
GrpcUdpHandlerFactory* handler_factory) {
gpr_mu_lock(&mutex_);
handler_factory_ = handler_factory;
udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data);
for (size_t i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], emfd_);
}
GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(emfd_, &read_closure_);
GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx);
notify_on_write_armed_ = true;
grpc_fd_notify_on_write(emfd_, &write_closure_);
/* Registered for both read and write callbacks: increment active_ports
* twice to account for this, and delay free-ing of memory until both
* on_read and on_write have fired. */
server_->active_ports += 2;
gpr_mu_unlock(&mutex_);
}
sp = sp->next;
void GrpcUdpListener::OnDestroy() {
if (udp_handler_ != nullptr) {
handler_factory_->DestroyUdpHandler(udp_handler_);
}
gpr_mu_unlock(&s->mu);
}
#endif

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -32,22 +33,45 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
/* Called when grpc server starts to listening on the grpc_fd. */
typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data);
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data,
grpc_closure* notify_on_write_closure);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_fd* emfd,
grpc_closure* shutdown_fd_callback,
void* user_data);
/* An interface associated with a socket. udp server delivers I/O event on that
* socket to the subclass of this interface which is created through
* GrpcUdpHandlerFactory.
* Its implementation should do the real IO work, e.g. read packet and write. */
class GrpcUdpHandler {
public:
GrpcUdpHandler(grpc_fd* emfd, void* user_data) {}
virtual ~GrpcUdpHandler() {}
// Interfaces to be implemented by subclasses to do the actual setup/tear down
// or I/O.
// Called when data is available to read from the socket. Returns true if
// there is more data to read after this call.
virtual bool Read() GRPC_ABSTRACT;
// Called when socket becomes write unblocked. The given closure should be
// scheduled when the socket becomes blocked next time.
virtual void OnCanWrite(void* user_data,
grpc_closure* notify_on_write_closure) GRPC_ABSTRACT;
// Called before the gRPC FD is orphaned. After return of this method, the
// associated fd will be closed.
virtual void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
void* user_data) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
class GrpcUdpHandlerFactory {
public:
virtual ~GrpcUdpHandlerFactory() {}
/* Called when start to listen on a socket.
* Return an instance of the implementation of GrpcUdpHandler interface which
* will process I/O events for this socket from now on. */
virtual GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd,
void* user_data) GRPC_ABSTRACT;
virtual void DestroyUdpHandler(GrpcUdpHandler* handler) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
/* Create a server, initially not bound to any ports */
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args);
@ -71,10 +95,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
GrpcUdpHandlerFactory* handler_factory);
void grpc_udp_server_destroy(grpc_udp_server* server, grpc_closure* on_done);

@ -36,6 +36,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/socket_factory_posix.h"
@ -54,42 +55,71 @@ static int g_number_of_starts = 0;
int rcv_buf_size = 1024;
int snd_buf_size = 1024;
static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; }
class TestGrpcUdpHandler : public GrpcUdpHandler {
public:
TestGrpcUdpHandler(grpc_fd* emfd, void* user_data)
: GrpcUdpHandler(emfd, user_data), emfd_(emfd) {
g_number_of_starts++;
}
~TestGrpcUdpHandler() override {}
static bool on_read(grpc_fd* emfd) {
char read_buffer[512];
ssize_t byte_count;
protected:
bool Read() override {
char read_buffer[512];
ssize_t byte_count;
gpr_mu_lock(g_mu);
byte_count =
recv(grpc_fd_wrapped_fd(emfd), read_buffer, sizeof(read_buffer), 0);
gpr_mu_lock(g_mu);
byte_count =
recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0);
g_number_of_reads++;
g_number_of_bytes_read += static_cast<int>(byte_count);
g_number_of_reads++;
g_number_of_bytes_read += static_cast<int>(byte_count);
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
return false;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
return false;
}
static void on_write(grpc_fd* emfd, void* user_data,
grpc_closure* notify_on_write_closure) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
void OnCanWrite(void* user_data,
grpc_closure* notify_on_write_closure) override {
gpr_mu_lock(g_mu);
g_number_of_writes++;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure,
void* user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
void* user_data) override {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd()));
GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
grpc_fd* emfd() { return emfd_; }
private:
grpc_fd* emfd_;
};
class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory {
public:
GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override {
gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd));
return grpc_core::New<TestGrpcUdpHandler>(emfd, user_data);
}
void DestroyUdpHandler(GrpcUdpHandler* handler) override {
gpr_log(GPR_INFO, "Destroy handler");
grpc_core::Delete(reinterpret_cast<TestGrpcUdpHandler*>(handler));
}
};
TestGrpcUdpHandlerFactory handler_factory;
struct test_socket_factory {
grpc_socket_factory base;
@ -184,13 +214,12 @@ static void test_no_op_with_port(void) {
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));
snd_buf_size, &handler_factory));
grpc_udp_server_destroy(s, nullptr);
/* The server had a single FD, which should have been orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
/* The server haven't start listening, so no udp handler to be notified. */
GPR_ASSERT(g_number_of_orphan_calls == 0);
shutdown_and_destroy_pollset();
}
@ -216,8 +245,7 @@ static void test_no_op_with_port_and_socket_factory(void) {
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));
snd_buf_size, &handler_factory));
GPR_ASSERT(socket_factory->number_of_socket_calls == 1);
GPR_ASSERT(socket_factory->number_of_bind_calls == 1);
@ -225,8 +253,8 @@ static void test_no_op_with_port_and_socket_factory(void) {
grpc_socket_factory_unref(&socket_factory->base);
/* The server had a single FD, which should have been orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
/* The server haven't start listening, so no udp handler to be notified. */
GPR_ASSERT(g_number_of_orphan_calls == 0);
shutdown_and_destroy_pollset();
}
@ -244,8 +272,7 @@ static void test_no_op_with_port_and_start(void) {
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));
snd_buf_size, &handler_factory));
grpc_udp_server_start(s, nullptr, 0, nullptr);
GPR_ASSERT(g_number_of_starts == 1);
@ -278,8 +305,7 @@ static void test_receive(int number_of_clients) {
resolved_addr.len = sizeof(struct sockaddr_storage);
addr->ss_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));
snd_buf_size, &handler_factory));
svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0);

Loading…
Cancel
Save