From b293e9e8224d1ebe03cc7ef28782c4148542bff0 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Tue, 27 Feb 2018 18:27:10 -0500 Subject: [PATCH] Refactors grpc udp_server_listener to be object oriented. Also adds a mutex to each listener. Instead of sharing the mutex in udp_server for all listeners, this per-listener mutex can make most of the call to different listeners in parallel. --- src/core/lib/iomgr/udp_server.cc | 482 ++++++++++++++++------------- src/core/lib/iomgr/udp_server.h | 61 ++-- test/core/iomgr/udp_server_test.cc | 108 ++++--- 3 files changed, 371 insertions(+), 280 deletions(-) diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index ec65497d792..d829c760348 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -52,6 +52,8 @@ #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" @@ -62,41 +64,104 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -/* one listening port */ -typedef struct grpc_udp_listener grpc_udp_listener; -struct grpc_udp_listener { - int fd; - grpc_fd* emfd; - grpc_udp_server* server; - grpc_resolved_address addr; - grpc_closure read_closure; - grpc_closure write_closure; +/* A listener which implements basic features of Listening on a port for + * I/O events*/ +class GrpcUdpListener { + public: + GrpcUdpListener(grpc_udp_server* server, int fd, + const grpc_resolved_address* addr); + ~GrpcUdpListener(); + + /* Called when grpc server starts to listening on the grpc_fd. */ + void StartListening(grpc_pollset** pollsets, size_t pollset_count, + GrpcUdpHandlerFactory* handler_factory); + + /* Called when data is available to read from the socket. + * Return true if there is more data to read from fd. */ + void OnRead(grpc_error* error, void* do_read_arg); + + /* Called when the socket is writeable. The given closure should be scheduled + * when the socket becomes blocked next time. */ + void OnCanWrite(grpc_error* error, void* do_write_arg); + + /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ + void OnFdAboutToOrphan(); + + /* Called to orphan fd of this listener.*/ + void OrphanFd(); + + /* Called when this listener is going to be destroyed. */ + void OnDestroy(); + + int fd() const { return fd_; } + + protected: + grpc_fd* emfd() const { return emfd_; } + + gpr_mu* mutex() { return &mutex_; } + + private: + /* event manager callback when reads are ready */ + static void on_read(void* arg, grpc_error* error); + static void on_write(void* arg, grpc_error* error); + + static void do_read(void* arg, grpc_error* error); + static void do_write(void* arg, grpc_error* error); + // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback + // interface. + static void fd_notify_on_write_wrapper(void* arg, grpc_error* error); + + static void shutdown_fd(void* args, grpc_error* error); + + int fd_; + grpc_fd* emfd_; + grpc_udp_server* server_; + grpc_resolved_address addr_; + grpc_closure read_closure_; + grpc_closure write_closure_; // To be called when corresponding QuicGrpcServer closes all active // connections. - grpc_closure orphan_fd_closure; - grpc_closure destroyed_closure; - grpc_udp_server_read_cb read_cb; - grpc_udp_server_write_cb write_cb; - grpc_udp_server_orphan_cb orphan_cb; - grpc_udp_server_start_cb start_cb; - // To be scheduled on another thread to actually read/write. - grpc_closure do_read_closure; - grpc_closure do_write_closure; - grpc_closure notify_on_write_closure; + grpc_closure orphan_fd_closure_; + grpc_closure destroyed_closure_; + // To be scheduled on another thread to actually read/write. + grpc_closure do_read_closure_; + grpc_closure do_write_closure_; + grpc_closure notify_on_write_closure_; // True if orphan_cb is trigered. - bool orphan_notified; + bool orphan_notified_; // True if grpc_fd_notify_on_write() is called after on_write() call. - bool notify_on_write_armed; + bool notify_on_write_armed_; // True if fd has been shutdown. - bool already_shutdown; - - struct grpc_udp_listener* next; + bool already_shutdown_; + // Object actually handles I/O events. Assigned in StartListening(). + GrpcUdpHandler* udp_handler_ = nullptr; + // To be notified on destruction. + GrpcUdpHandlerFactory* handler_factory_ = nullptr; + // Required to access above fields. + gpr_mu mutex_; }; -struct shutdown_fd_args { - grpc_udp_listener* sp; - gpr_mu* server_mu; -}; +GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, + const grpc_resolved_address* addr) + : fd_(fd), + server_(server), + orphan_notified_(false), + already_shutdown_(false) { + char* addr_str; + char* name; + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "udp-server-listener:%s", addr_str); + gpr_free(addr_str); + emfd_ = grpc_fd_create(fd, name); + memcpy(&addr_, addr, sizeof(grpc_resolved_address)); + GPR_ASSERT(emfd_); + gpr_free(name); + gpr_mu_init(&mutex_); +} + +GrpcUdpListener::~GrpcUdpListener() { + gpr_mu_destroy(&mutex_); +} /* the overall server */ struct grpc_udp_server { @@ -113,10 +178,11 @@ struct grpc_udp_server { /* is this server shutting down? (boolean) */ int shutdown; - /* linked list of server ports */ - grpc_udp_listener* head; - grpc_udp_listener* tail; - unsigned nports; + /* An array of listeners */ + grpc_core::InlinedVector listeners; + + /* factory for use to create udp listeners */ + GrpcUdpHandlerFactory* handler_factory; /* shutdown callback */ grpc_closure* shutdown_complete; @@ -141,8 +207,7 @@ static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { } grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { - grpc_udp_server* s = - static_cast(gpr_malloc(sizeof(grpc_udp_server))); + grpc_udp_server* s = grpc_core::New(); gpr_mu_init(&s->mu); s->socket_factory = get_socket_factory(args); if (s->socket_factory) { @@ -151,34 +216,29 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->head = nullptr; - s->tail = nullptr; - s->nports = 0; - return s; } -static void shutdown_fd(void* args, grpc_error* error) { - struct shutdown_fd_args* shutdown_args = - static_cast(args); - grpc_udp_listener* sp = shutdown_args->sp; - gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); - gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error)); - sp->already_shutdown = true; - if (!sp->notify_on_write_armed) { +// static +void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) { + if (args == nullptr) { + // No-op if shutdown args are null. + return; + } + auto sp = static_cast(args); + gpr_mu_lock(sp->mutex()); + gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_); + grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error)); + sp->already_shutdown_ = true; + if (!sp->notify_on_write_armed_) { // Re-arm write notification to notify listener with error. This is // necessary to decrement active_ports. - sp->notify_on_write_armed = true; - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + sp->notify_on_write_armed_ = true; + grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); } - gpr_mu_unlock(shutdown_args->server_mu); - gpr_free(shutdown_args); + gpr_mu_unlock(sp->mutex()); } -static void dummy_cb(void* arg, grpc_error* error) { - // No-op. -} static void finish_shutdown(grpc_udp_server* s) { if (s->shutdown_complete != nullptr) { @@ -188,24 +248,23 @@ static void finish_shutdown(grpc_udp_server* s) { gpr_mu_destroy(&s->mu); gpr_log(GPR_DEBUG, "Destroy all listeners."); - while (s->head) { - grpc_udp_listener* sp = s->head; - s->head = sp->next; - gpr_free(sp); + for(size_t i = 0; i < s->listeners.size(); ++i) { + s->listeners[i].OnDestroy(); } if (s->socket_factory) { grpc_socket_factory_unref(s->socket_factory); } - gpr_free(s); + grpc_core::Delete(s); + } static void destroyed_port(void* server, grpc_error* error) { grpc_udp_server* s = static_cast(server); gpr_mu_lock(&s->mu); s->destroyed_ports++; - if (s->destroyed_ports == s->nports) { + if (s->destroyed_ports == s->listeners.size()) { gpr_mu_unlock(&s->mu); finish_shutdown(s); } else { @@ -222,35 +281,30 @@ static void deactivated_all_ports(grpc_udp_server* s) { GPR_ASSERT(s->shutdown); - if (s->head) { - grpc_udp_listener* sp; - for (sp = s->head; sp; sp = sp->next) { - grpc_unlink_if_unix_domain_socket(&sp->addr); - - GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, - grpc_schedule_on_exec_ctx); - if (!sp->orphan_notified) { - /* Call the orphan_cb to signal that the FD is about to be closed and - * should no longer be used. Because at this point, all listening ports - * have been shutdown already, no need to shutdown again.*/ - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, - grpc_schedule_on_exec_ctx); - GPR_ASSERT(sp->orphan_cb); - gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); - sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); - } - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, - false /* already_closed */, "udp_listener_shutdown"); - } - gpr_mu_unlock(&s->mu); - } else { - gpr_mu_unlock(&s->mu); + if (s->listeners.size() == 0) { + gpr_mu_unlock(&s->mu); finish_shutdown(s); + return; + } + for (size_t i = 0; i < s->listeners.size(); ++i) { + s->listeners[i].OrphanFd(); } + gpr_mu_unlock(&s->mu); +} + +void GrpcUdpListener::OrphanFd() { + gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_); + grpc_unlink_if_unix_domain_socket(&addr_); + + GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, + grpc_schedule_on_exec_ctx); + /* Because at this point, all listening sockets have been shutdown already, no + * need to call OnFdAboutToOrphan() to notify the handler again. */ + grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, + false /* already_closed */, "udp_listener_shutdown"); } void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { - grpc_udp_listener* sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -261,16 +315,9 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { gpr_log(GPR_DEBUG, "start to destroy udp_server"); /* shutdown all fd's */ if (s->active_ports) { - for (sp = s->head; sp; sp = sp->next) { - GPR_ASSERT(sp->orphan_cb); - struct shutdown_fd_args* args = - static_cast(gpr_malloc(sizeof(*args))); - args->sp = sp; - args->server_mu = &s->mu; - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, - grpc_schedule_on_exec_ctx); - sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); - sp->orphan_notified = true; + for (size_t i = 0; i < s->listeners.size(); ++i) { + GrpcUdpListener* sp = &s->listeners[i]; + sp->OnFdAboutToOrphan(); } gpr_mu_unlock(&s->mu); } else { @@ -279,6 +326,24 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { } } +void GrpcUdpListener::OnFdAboutToOrphan() { + gpr_mu_lock(&mutex_); + grpc_unlink_if_unix_domain_socket(&addr_); + + GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, + grpc_schedule_on_exec_ctx); + if (!orphan_notified_ && udp_handler_ != nullptr) { + /* Singals udp_handler that the FD is about to be closed and + * should no longer be used. */ + GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this, + grpc_schedule_on_exec_ctx); + gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_); + udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data); + orphan_notified_ = true; + } + gpr_mu_unlock(&mutex_); +} + static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, const grpc_resolved_address* addr) { return (socket_factory != nullptr) @@ -364,163 +429,140 @@ error: return -1; } -static void do_read(void* arg, grpc_error* error) { - grpc_udp_listener* sp = reinterpret_cast(arg); - GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); +// static +void GrpcUdpListener::do_read(void* arg, grpc_error* error) { + GrpcUdpListener* sp = reinterpret_cast(arg); + GPR_ASSERT(error == GRPC_ERROR_NONE); /* TODO: the reason we hold server->mu here is merely to prevent fd * shutdown while we are reading. However, it blocks do_write(). Switch to * read lock if available. */ - gpr_mu_lock(&sp->server->mu); + gpr_mu_lock(sp->mutex()); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { + if (!sp->already_shutdown_ && sp->udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a * notification with shutdown error. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_); } - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(sp->mutex()); } -/* event manager callback when reads are ready */ -static void on_read(void* arg, grpc_error* error) { - grpc_udp_listener* sp = static_cast(arg); +// static +void GrpcUdpListener::on_read(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast(arg); + sp->OnRead(error, arg); +} - gpr_mu_lock(&sp->server->mu); +void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports && sp->server->shutdown) { - gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + gpr_mu_lock(&server_->mu); + if (0 == --server_->active_ports && server_->shutdown) { + gpr_mu_unlock(&server_->mu); + deactivated_all_ports(server_); } else { - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(&server_->mu); } return; } + /* Read once. If there is more data to read, off load the work to another * thread to finish. */ - GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd)) { + if (udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, + GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a * notification with shutdown error. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(emfd_, &read_closure_); } - gpr_mu_unlock(&sp->server->mu); } +// static // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. -void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { - grpc_udp_listener* sp = reinterpret_cast(arg); - gpr_mu_lock(&sp->server->mu); - if (!sp->notify_on_write_armed) { - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); - sp->notify_on_write_armed = true; - } - gpr_mu_unlock(&sp->server->mu); +void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) { + GrpcUdpListener* sp = reinterpret_cast(arg); + gpr_mu_lock(sp->mutex()); + if (!sp->notify_on_write_armed_) { + grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); + sp->notify_on_write_armed_ = true; + } + gpr_mu_unlock(sp->mutex()); } -static void do_write(void* arg, grpc_error* error) { - grpc_udp_listener* sp = reinterpret_cast(arg); - gpr_mu_lock(&sp->server->mu); - if (sp->already_shutdown) { +// static +void GrpcUdpListener::do_write(void* arg, grpc_error* error) { + GrpcUdpListener* sp = reinterpret_cast(arg); + gpr_mu_lock(sp->mutex()); + if (sp->already_shutdown_) { // If fd has been shutdown, don't write any more and re-arm notification. - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); } else { - sp->notify_on_write_armed = false; + sp->notify_on_write_armed_ = false; /* Tell the registered callback that the socket is writeable. */ - GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); - GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, + GPR_ASSERT(error == GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper, arg, grpc_schedule_on_exec_ctx); - sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure); + sp->udp_handler_->OnCanWrite(sp->server_->user_data, + &sp->notify_on_write_closure_); } - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(sp->mutex()); } -static void on_write(void* arg, grpc_error* error) { - grpc_udp_listener* sp = static_cast(arg); +// static +void GrpcUdpListener::on_write(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast(arg); + sp->OnCanWrite(error, arg); +} - gpr_mu_lock(&sp->server->mu); +void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports && sp->server->shutdown) { - gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + gpr_mu_lock(&server_->mu); + if (0 == --server_->active_ports && server_->shutdown) { + gpr_mu_unlock(&server_->mu); + deactivated_all_ports(server_); } else { - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(&server_->mu); } return; } /* Schedule actual write in another thread. */ - GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, + GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE); - gpr_mu_unlock(&sp->server->mu); + GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); } static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, - int rcv_buf_size, int snd_buf_size, - grpc_udp_server_start_cb start_cb, - grpc_udp_server_read_cb read_cb, - grpc_udp_server_write_cb write_cb, - grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener* sp; - int port; - char* addr_str; - char* name; + int rcv_buf_size, int snd_buf_size) { + gpr_log(GPR_DEBUG, "add socket %d to server", fd); - port = + int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size); if (port >= 0) { - grpc_sockaddr_to_string(&addr_str, addr, 1); - gpr_asprintf(&name, "udp-server-listener:%s", addr_str); - gpr_free(addr_str); gpr_mu_lock(&s->mu); - s->nports++; - sp = static_cast(gpr_malloc(sizeof(grpc_udp_listener))); - sp->next = nullptr; - if (s->head == nullptr) { - s->head = sp; - } else { - s->tail->next = sp; - } - s->tail = sp; - sp->server = s; - sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); - memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); - sp->read_cb = read_cb; - sp->write_cb = write_cb; - sp->orphan_cb = orphan_cb; - sp->start_cb = start_cb; - sp->orphan_notified = false; - sp->already_shutdown = false; - GPR_ASSERT(sp->emfd); + s->listeners.emplace_back(s, fd, addr); + gpr_log(GPR_DEBUG, + "add socket %d to server for port %d, %zu listener(s) in total", fd, + port, s->listeners.size()); gpr_mu_unlock(&s->mu); - gpr_free(name); } - return port; } int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, int rcv_buf_size, int snd_buf_size, - grpc_udp_server_start_cb start_cb, - grpc_udp_server_read_cb read_cb, - grpc_udp_server_write_cb write_cb, - grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener* sp; + GrpcUdpHandlerFactory* handler_factory) { int allocated_port1 = -1; int allocated_port2 = -1; int fd; @@ -536,10 +578,10 @@ int grpc_udp_server_add_port(grpc_udp_server* s, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (sp = s->head; sp; sp = sp->next) { + for (size_t i = 0; i < s->listeners.size(); ++i) { sockname_temp.len = sizeof(struct sockaddr_storage); if (0 == - getsockname(sp->fd, + getsockname(s->listeners[i].fd(), reinterpret_cast(sockname_temp.addr), reinterpret_cast(&sockname_temp.len))) { port = grpc_sockaddr_get_port(&sockname_temp); @@ -559,6 +601,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr6_v4mapped; } + s->handler_factory = handler_factory; /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ if (grpc_sockaddr_is_wildcard(addr, &port)) { grpc_sockaddr_make_wildcards(port, &wild4, &wild6); @@ -569,8 +612,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); allocated_port1 = - add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, - read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -593,8 +635,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, - read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); done: gpr_free(allocated_addr); @@ -602,52 +643,55 @@ done: } int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { - grpc_udp_listener* sp; - if (port_index >= s->nports) { + if (port_index >= s->listeners.size()) { return -1; } - for (sp = s->head; sp && port_index != 0; sp = sp->next) { - --port_index; - } - GPR_ASSERT(sp); // if this fails, our check earlier was bogus - return sp->fd; + return s->listeners[port_index].fd(); } void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { gpr_log(GPR_DEBUG, "grpc_udp_server_start"); - size_t i; gpr_mu_lock(&s->mu); - grpc_udp_listener* sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->user_data = user_data; - sp = s->head; - while (sp != nullptr) { - sp->start_cb(sp->emfd, sp->server->user_data); - for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(pollsets[i], sp->emfd); - } - GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, - grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + for(size_t i = 0; i < s->listeners.size(); ++i) { + s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory); + } - GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, - grpc_schedule_on_exec_ctx); - sp->notify_on_write_armed = true; - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + gpr_mu_unlock(&s->mu); +} - /* Registered for both read and write callbacks: increment active_ports - * twice to account for this, and delay free-ing of memory until both - * on_read and on_write have fired. */ - s->active_ports += 2; +void GrpcUdpListener::StartListening(grpc_pollset** pollsets, + size_t pollset_count, + GrpcUdpHandlerFactory* handler_factory) { + gpr_mu_lock(&mutex_); + handler_factory_ = handler_factory; + udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data); + for (size_t i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(pollsets[i], emfd_); + } + GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(emfd_, &read_closure_); + + GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx); + notify_on_write_armed_ = true; + grpc_fd_notify_on_write(emfd_, &write_closure_); + + /* Registered for both read and write callbacks: increment active_ports + * twice to account for this, and delay free-ing of memory until both + * on_read and on_write have fired. */ + server_->active_ports += 2; + gpr_mu_unlock(&mutex_); +} - sp = sp->next; +void GrpcUdpListener::OnDestroy() { + if (udp_handler_ != nullptr) { + handler_factory_->DestroyUdpHandler(udp_handler_); } - - gpr_mu_unlock(&s->mu); } #endif diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 1be4d04dbb3..860a3fbbb20 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -21,6 +21,7 @@ #include +#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -32,22 +33,45 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* Called when grpc server starts to listening on the grpc_fd. */ -typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); - -/* Called when data is available to read from the socket. - * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); - -/* Called when the socket is writeable. The given closure should be scheduled - * when the socket becomes blocked next time. */ -typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data, - grpc_closure* notify_on_write_closure); - -/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ -typedef void (*grpc_udp_server_orphan_cb)(grpc_fd* emfd, - grpc_closure* shutdown_fd_callback, - void* user_data); +/* An interface associated with a socket. udp server delivers I/O event on that + * socket to the subclass of this interface which is created through + * GrpcUdpHandlerFactory. + * Its implementation should do the real IO work, e.g. read packet and write. */ +class GrpcUdpHandler { + public: + GrpcUdpHandler(grpc_fd* emfd, void* user_data) {} + virtual ~GrpcUdpHandler() {} + + // Interfaces to be implemented by subclasses to do the actual setup/tear down + // or I/O. + + // Called when data is available to read from the socket. Returns true if + // there is more data to read after this call. + virtual bool Read() GRPC_ABSTRACT; + // Called when socket becomes write unblocked. The given closure should be + // scheduled when the socket becomes blocked next time. + virtual void OnCanWrite(void* user_data, + grpc_closure* notify_on_write_closure) GRPC_ABSTRACT; + // Called before the gRPC FD is orphaned. After return of this method, the + // associated fd will be closed. + virtual void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure, + void* user_data) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; + +class GrpcUdpHandlerFactory { + public: + virtual ~GrpcUdpHandlerFactory() {} + /* Called when start to listen on a socket. + * Return an instance of the implementation of GrpcUdpHandler interface which + * will process I/O events for this socket from now on. */ + virtual GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, + void* user_data) GRPC_ABSTRACT; + virtual void DestroyUdpHandler(GrpcUdpHandler* handler) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; /* Create a server, initially not bound to any ports */ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args); @@ -71,10 +95,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, int rcv_buf_size, int snd_buf_size, - grpc_udp_server_start_cb start_cb, - grpc_udp_server_read_cb read_cb, - grpc_udp_server_write_cb write_cb, - grpc_udp_server_orphan_cb orphan_cb); + GrpcUdpHandlerFactory* handler_factory); void grpc_udp_server_destroy(grpc_udp_server* server, grpc_closure* on_done); diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 13cbf2f6dff..50e79a08b01 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -36,6 +36,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/socket_factory_posix.h" @@ -54,42 +55,71 @@ static int g_number_of_starts = 0; int rcv_buf_size = 1024; int snd_buf_size = 1024; -static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; } +class TestGrpcUdpHandler : public GrpcUdpHandler { + public: + TestGrpcUdpHandler(grpc_fd* emfd, void* user_data) + : GrpcUdpHandler(emfd, user_data), emfd_(emfd) { + g_number_of_starts++; + } + ~TestGrpcUdpHandler() override {} -static bool on_read(grpc_fd* emfd) { - char read_buffer[512]; - ssize_t byte_count; + protected: + bool Read() override { + char read_buffer[512]; + ssize_t byte_count; - gpr_mu_lock(g_mu); - byte_count = - recv(grpc_fd_wrapped_fd(emfd), read_buffer, sizeof(read_buffer), 0); + gpr_mu_lock(g_mu); + byte_count = + recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0); - g_number_of_reads++; - g_number_of_bytes_read += static_cast(byte_count); + g_number_of_reads++; + g_number_of_bytes_read += static_cast(byte_count); - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); - gpr_mu_unlock(g_mu); - return false; -} + GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(g_pollset, nullptr))); + gpr_mu_unlock(g_mu); + return false; + } -static void on_write(grpc_fd* emfd, void* user_data, - grpc_closure* notify_on_write_closure) { - gpr_mu_lock(g_mu); - g_number_of_writes++; + void OnCanWrite(void* user_data, + grpc_closure* notify_on_write_closure) override { + gpr_mu_lock(g_mu); + g_number_of_writes++; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); - gpr_mu_unlock(g_mu); -} + GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(g_pollset, nullptr))); + gpr_mu_unlock(g_mu); + } -static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure, - void* user_data) { - gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", - grpc_fd_wrapped_fd(emfd)); - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); - g_number_of_orphan_calls++; -} + void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure, + void* user_data) override { + gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", + grpc_fd_wrapped_fd(emfd())); + GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE); + g_number_of_orphan_calls++; + } + + grpc_fd* emfd() { return emfd_; } + + private: + grpc_fd* emfd_; +}; + +class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory { + public: + GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override { + gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd)); + return grpc_core::New(emfd, user_data); + } + + void DestroyUdpHandler(GrpcUdpHandler* handler) override { + gpr_log(GPR_INFO, "Destroy handler"); + grpc_core::Delete(reinterpret_cast(handler)); + + } +}; + +TestGrpcUdpHandlerFactory handler_factory; struct test_socket_factory { grpc_socket_factory base; @@ -184,13 +214,12 @@ static void test_no_op_with_port(void) { resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); grpc_udp_server_destroy(s, nullptr); - /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + /* The server haven't start listening, so no udp handler to be notified. */ + GPR_ASSERT(g_number_of_orphan_calls == 0); shutdown_and_destroy_pollset(); } @@ -216,8 +245,7 @@ static void test_no_op_with_port_and_socket_factory(void) { resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); @@ -225,8 +253,8 @@ static void test_no_op_with_port_and_socket_factory(void) { grpc_socket_factory_unref(&socket_factory->base); - /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + /* The server haven't start listening, so no udp handler to be notified. */ + GPR_ASSERT(g_number_of_orphan_calls == 0); shutdown_and_destroy_pollset(); } @@ -244,8 +272,7 @@ static void test_no_op_with_port_and_start(void) { resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); grpc_udp_server_start(s, nullptr, 0, nullptr); GPR_ASSERT(g_number_of_starts == 1); @@ -278,8 +305,7 @@ static void test_receive(int number_of_clients) { resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0);