|
|
|
@ -52,6 +52,8 @@ |
|
|
|
|
#include <grpc/support/time.h> |
|
|
|
|
#include "src/core/lib/channel/channel_args.h" |
|
|
|
|
#include "src/core/lib/gpr/string.h" |
|
|
|
|
#include "src/core/lib/gprpp/inlined_vector.h" |
|
|
|
|
#include "src/core/lib/gprpp/memory.h" |
|
|
|
|
#include "src/core/lib/iomgr/error.h" |
|
|
|
|
#include "src/core/lib/iomgr/ev_posix.h" |
|
|
|
|
#include "src/core/lib/iomgr/executor.h" |
|
|
|
@ -62,41 +64,102 @@ |
|
|
|
|
#include "src/core/lib/iomgr/socket_utils_posix.h" |
|
|
|
|
#include "src/core/lib/iomgr/unix_sockets_posix.h" |
|
|
|
|
|
|
|
|
|
/* one listening port */ |
|
|
|
|
typedef struct grpc_udp_listener grpc_udp_listener; |
|
|
|
|
struct grpc_udp_listener { |
|
|
|
|
int fd; |
|
|
|
|
grpc_fd* emfd; |
|
|
|
|
grpc_udp_server* server; |
|
|
|
|
grpc_resolved_address addr; |
|
|
|
|
grpc_closure read_closure; |
|
|
|
|
grpc_closure write_closure; |
|
|
|
|
/* A listener which implements basic features of Listening on a port for
|
|
|
|
|
* I/O events*/ |
|
|
|
|
class GrpcUdpListener { |
|
|
|
|
public: |
|
|
|
|
GrpcUdpListener(grpc_udp_server* server, int fd, |
|
|
|
|
const grpc_resolved_address* addr); |
|
|
|
|
~GrpcUdpListener(); |
|
|
|
|
|
|
|
|
|
/* Called when grpc server starts to listening on the grpc_fd. */ |
|
|
|
|
void StartListening(grpc_pollset** pollsets, size_t pollset_count, |
|
|
|
|
GrpcUdpHandlerFactory* handler_factory); |
|
|
|
|
|
|
|
|
|
/* Called when data is available to read from the socket.
|
|
|
|
|
* Return true if there is more data to read from fd. */ |
|
|
|
|
void OnRead(grpc_error* error, void* do_read_arg); |
|
|
|
|
|
|
|
|
|
/* Called when the socket is writeable. The given closure should be scheduled
|
|
|
|
|
* when the socket becomes blocked next time. */ |
|
|
|
|
void OnCanWrite(grpc_error* error, void* do_write_arg); |
|
|
|
|
|
|
|
|
|
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ |
|
|
|
|
void OnFdAboutToOrphan(); |
|
|
|
|
|
|
|
|
|
/* Called to orphan fd of this listener.*/ |
|
|
|
|
void OrphanFd(); |
|
|
|
|
|
|
|
|
|
/* Called when this listener is going to be destroyed. */ |
|
|
|
|
void OnDestroy(); |
|
|
|
|
|
|
|
|
|
int fd() const { return fd_; } |
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
grpc_fd* emfd() const { return emfd_; } |
|
|
|
|
|
|
|
|
|
gpr_mu* mutex() { return &mutex_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
/* event manager callback when reads are ready */ |
|
|
|
|
static void on_read(void* arg, grpc_error* error); |
|
|
|
|
static void on_write(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
static void do_read(void* arg, grpc_error* error); |
|
|
|
|
static void do_write(void* arg, grpc_error* error); |
|
|
|
|
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback
|
|
|
|
|
// interface.
|
|
|
|
|
static void fd_notify_on_write_wrapper(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
static void shutdown_fd(void* args, grpc_error* error); |
|
|
|
|
|
|
|
|
|
int fd_; |
|
|
|
|
grpc_fd* emfd_; |
|
|
|
|
grpc_udp_server* server_; |
|
|
|
|
grpc_resolved_address addr_; |
|
|
|
|
grpc_closure read_closure_; |
|
|
|
|
grpc_closure write_closure_; |
|
|
|
|
// To be called when corresponding QuicGrpcServer closes all active
|
|
|
|
|
// connections.
|
|
|
|
|
grpc_closure orphan_fd_closure; |
|
|
|
|
grpc_closure destroyed_closure; |
|
|
|
|
grpc_udp_server_read_cb read_cb; |
|
|
|
|
grpc_udp_server_write_cb write_cb; |
|
|
|
|
grpc_udp_server_orphan_cb orphan_cb; |
|
|
|
|
grpc_udp_server_start_cb start_cb; |
|
|
|
|
grpc_closure orphan_fd_closure_; |
|
|
|
|
grpc_closure destroyed_closure_; |
|
|
|
|
// To be scheduled on another thread to actually read/write.
|
|
|
|
|
grpc_closure do_read_closure; |
|
|
|
|
grpc_closure do_write_closure; |
|
|
|
|
grpc_closure notify_on_write_closure; |
|
|
|
|
grpc_closure do_read_closure_; |
|
|
|
|
grpc_closure do_write_closure_; |
|
|
|
|
grpc_closure notify_on_write_closure_; |
|
|
|
|
// True if orphan_cb is trigered.
|
|
|
|
|
bool orphan_notified; |
|
|
|
|
bool orphan_notified_; |
|
|
|
|
// True if grpc_fd_notify_on_write() is called after on_write() call.
|
|
|
|
|
bool notify_on_write_armed; |
|
|
|
|
bool notify_on_write_armed_; |
|
|
|
|
// True if fd has been shutdown.
|
|
|
|
|
bool already_shutdown; |
|
|
|
|
|
|
|
|
|
struct grpc_udp_listener* next; |
|
|
|
|
bool already_shutdown_; |
|
|
|
|
// Object actually handles I/O events. Assigned in StartListening().
|
|
|
|
|
GrpcUdpHandler* udp_handler_ = nullptr; |
|
|
|
|
// To be notified on destruction.
|
|
|
|
|
GrpcUdpHandlerFactory* handler_factory_ = nullptr; |
|
|
|
|
// Required to access above fields.
|
|
|
|
|
gpr_mu mutex_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct shutdown_fd_args { |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
gpr_mu* server_mu; |
|
|
|
|
}; |
|
|
|
|
GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, |
|
|
|
|
const grpc_resolved_address* addr) |
|
|
|
|
: fd_(fd), |
|
|
|
|
server_(server), |
|
|
|
|
orphan_notified_(false), |
|
|
|
|
already_shutdown_(false) { |
|
|
|
|
char* addr_str; |
|
|
|
|
char* name; |
|
|
|
|
grpc_sockaddr_to_string(&addr_str, addr, 1); |
|
|
|
|
gpr_asprintf(&name, "udp-server-listener:%s", addr_str); |
|
|
|
|
gpr_free(addr_str); |
|
|
|
|
emfd_ = grpc_fd_create(fd, name); |
|
|
|
|
memcpy(&addr_, addr, sizeof(grpc_resolved_address)); |
|
|
|
|
GPR_ASSERT(emfd_); |
|
|
|
|
gpr_free(name); |
|
|
|
|
gpr_mu_init(&mutex_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GrpcUdpListener::~GrpcUdpListener() { gpr_mu_destroy(&mutex_); } |
|
|
|
|
|
|
|
|
|
/* the overall server */ |
|
|
|
|
struct grpc_udp_server { |
|
|
|
@ -113,10 +176,11 @@ struct grpc_udp_server { |
|
|
|
|
/* is this server shutting down? (boolean) */ |
|
|
|
|
int shutdown; |
|
|
|
|
|
|
|
|
|
/* linked list of server ports */ |
|
|
|
|
grpc_udp_listener* head; |
|
|
|
|
grpc_udp_listener* tail; |
|
|
|
|
unsigned nports; |
|
|
|
|
/* An array of listeners */ |
|
|
|
|
grpc_core::InlinedVector<GrpcUdpListener, 16> listeners; |
|
|
|
|
|
|
|
|
|
/* factory for use to create udp listeners */ |
|
|
|
|
GrpcUdpHandlerFactory* handler_factory; |
|
|
|
|
|
|
|
|
|
/* shutdown callback */ |
|
|
|
|
grpc_closure* shutdown_complete; |
|
|
|
@ -141,8 +205,7 @@ static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { |
|
|
|
|
grpc_udp_server* s = |
|
|
|
|
static_cast<grpc_udp_server*>(gpr_malloc(sizeof(grpc_udp_server))); |
|
|
|
|
grpc_udp_server* s = grpc_core::New<grpc_udp_server>(); |
|
|
|
|
gpr_mu_init(&s->mu); |
|
|
|
|
s->socket_factory = get_socket_factory(args); |
|
|
|
|
if (s->socket_factory) { |
|
|
|
@ -151,33 +214,27 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { |
|
|
|
|
s->active_ports = 0; |
|
|
|
|
s->destroyed_ports = 0; |
|
|
|
|
s->shutdown = 0; |
|
|
|
|
s->head = nullptr; |
|
|
|
|
s->tail = nullptr; |
|
|
|
|
s->nports = 0; |
|
|
|
|
|
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void shutdown_fd(void* args, grpc_error* error) { |
|
|
|
|
struct shutdown_fd_args* shutdown_args = |
|
|
|
|
static_cast<struct shutdown_fd_args*>(args); |
|
|
|
|
grpc_udp_listener* sp = shutdown_args->sp; |
|
|
|
|
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); |
|
|
|
|
gpr_mu_lock(shutdown_args->server_mu); |
|
|
|
|
grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error)); |
|
|
|
|
sp->already_shutdown = true; |
|
|
|
|
if (!sp->notify_on_write_armed) { |
|
|
|
|
// static
|
|
|
|
|
void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) { |
|
|
|
|
if (args == nullptr) { |
|
|
|
|
// No-op if shutdown args are null.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
auto sp = static_cast<GrpcUdpListener*>(args); |
|
|
|
|
gpr_mu_lock(sp->mutex()); |
|
|
|
|
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_); |
|
|
|
|
grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error)); |
|
|
|
|
sp->already_shutdown_ = true; |
|
|
|
|
if (!sp->notify_on_write_armed_) { |
|
|
|
|
// Re-arm write notification to notify listener with error. This is
|
|
|
|
|
// necessary to decrement active_ports.
|
|
|
|
|
sp->notify_on_write_armed = true; |
|
|
|
|
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); |
|
|
|
|
sp->notify_on_write_armed_ = true; |
|
|
|
|
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(shutdown_args->server_mu); |
|
|
|
|
gpr_free(shutdown_args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void dummy_cb(void* arg, grpc_error* error) { |
|
|
|
|
// No-op.
|
|
|
|
|
gpr_mu_unlock(sp->mutex()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void finish_shutdown(grpc_udp_server* s) { |
|
|
|
@ -188,24 +245,22 @@ static void finish_shutdown(grpc_udp_server* s) { |
|
|
|
|
gpr_mu_destroy(&s->mu); |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "Destroy all listeners."); |
|
|
|
|
while (s->head) { |
|
|
|
|
grpc_udp_listener* sp = s->head; |
|
|
|
|
s->head = sp->next; |
|
|
|
|
gpr_free(sp); |
|
|
|
|
for (size_t i = 0; i < s->listeners.size(); ++i) { |
|
|
|
|
s->listeners[i].OnDestroy(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (s->socket_factory) { |
|
|
|
|
grpc_socket_factory_unref(s->socket_factory); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_free(s); |
|
|
|
|
grpc_core::Delete(s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void destroyed_port(void* server, grpc_error* error) { |
|
|
|
|
grpc_udp_server* s = static_cast<grpc_udp_server*>(server); |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
s->destroyed_ports++; |
|
|
|
|
if (s->destroyed_ports == s->nports) { |
|
|
|
|
if (s->destroyed_ports == s->listeners.size()) { |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
finish_shutdown(s); |
|
|
|
|
} else { |
|
|
|
@ -222,35 +277,30 @@ static void deactivated_all_ports(grpc_udp_server* s) { |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(s->shutdown); |
|
|
|
|
|
|
|
|
|
if (s->head) { |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
|
grpc_unlink_if_unix_domain_socket(&sp->addr); |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
if (!sp->orphan_notified) { |
|
|
|
|
/* Call the orphan_cb to signal that the FD is about to be closed and
|
|
|
|
|
* should no longer be used. Because at this point, all listening ports |
|
|
|
|
* have been shutdown already, no need to shutdown again.*/ |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
GPR_ASSERT(sp->orphan_cb); |
|
|
|
|
gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); |
|
|
|
|
sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); |
|
|
|
|
} |
|
|
|
|
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, |
|
|
|
|
false /* already_closed */, "udp_listener_shutdown"); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
} else { |
|
|
|
|
if (s->listeners.size() == 0) { |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
finish_shutdown(s); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < s->listeners.size(); ++i) { |
|
|
|
|
s->listeners[i].OrphanFd(); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcUdpListener::OrphanFd() { |
|
|
|
|
gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_); |
|
|
|
|
grpc_unlink_if_unix_domain_socket(&addr_); |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
/* Because at this point, all listening sockets have been shutdown already, no
|
|
|
|
|
* need to call OnFdAboutToOrphan() to notify the handler again. */ |
|
|
|
|
grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, |
|
|
|
|
false /* already_closed */, "udp_listener_shutdown"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(!s->shutdown); |
|
|
|
@ -261,16 +311,9 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { |
|
|
|
|
gpr_log(GPR_DEBUG, "start to destroy udp_server"); |
|
|
|
|
/* shutdown all fd's */ |
|
|
|
|
if (s->active_ports) { |
|
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
|
GPR_ASSERT(sp->orphan_cb); |
|
|
|
|
struct shutdown_fd_args* args = |
|
|
|
|
static_cast<struct shutdown_fd_args*>(gpr_malloc(sizeof(*args))); |
|
|
|
|
args->sp = sp; |
|
|
|
|
args->server_mu = &s->mu; |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); |
|
|
|
|
sp->orphan_notified = true; |
|
|
|
|
for (size_t i = 0; i < s->listeners.size(); ++i) { |
|
|
|
|
GrpcUdpListener* sp = &s->listeners[i]; |
|
|
|
|
sp->OnFdAboutToOrphan(); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
} else { |
|
|
|
@ -279,6 +322,24 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcUdpListener::OnFdAboutToOrphan() { |
|
|
|
|
gpr_mu_lock(&mutex_); |
|
|
|
|
grpc_unlink_if_unix_domain_socket(&addr_); |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
if (!orphan_notified_ && udp_handler_ != nullptr) { |
|
|
|
|
/* Singals udp_handler that the FD is about to be closed and
|
|
|
|
|
* should no longer be used. */ |
|
|
|
|
GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_); |
|
|
|
|
udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data); |
|
|
|
|
orphan_notified_ = true; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&mutex_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, |
|
|
|
|
const grpc_resolved_address* addr) { |
|
|
|
|
return (socket_factory != nullptr) |
|
|
|
@ -364,163 +425,140 @@ error: |
|
|
|
|
return -1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void do_read(void* arg, grpc_error* error) { |
|
|
|
|
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); |
|
|
|
|
GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); |
|
|
|
|
// static
|
|
|
|
|
void GrpcUdpListener::do_read(void* arg, grpc_error* error) { |
|
|
|
|
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
|
|
|
|
GPR_ASSERT(error == GRPC_ERROR_NONE); |
|
|
|
|
/* TODO: the reason we hold server->mu here is merely to prevent fd
|
|
|
|
|
* shutdown while we are reading. However, it blocks do_write(). Switch to |
|
|
|
|
* read lock if available. */ |
|
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
|
gpr_mu_lock(sp->mutex()); |
|
|
|
|
/* Tell the registered callback that data is available to read. */ |
|
|
|
|
if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { |
|
|
|
|
if (!sp->already_shutdown_ && sp->udp_handler_->Read()) { |
|
|
|
|
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
|
|
|
|
|
* after finishing this event loop. */ |
|
|
|
|
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); |
|
|
|
|
GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
/* Finish reading all the packets, re-arm the notification event so we can
|
|
|
|
|
* get another chance to read. Or fd already shutdown, re-arm to get a |
|
|
|
|
* notification with shutdown error. */ |
|
|
|
|
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); |
|
|
|
|
grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
gpr_mu_unlock(sp->mutex()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* event manager callback when reads are ready */ |
|
|
|
|
static void on_read(void* arg, grpc_error* error) { |
|
|
|
|
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); |
|
|
|
|
// static
|
|
|
|
|
void GrpcUdpListener::on_read(void* arg, grpc_error* error) { |
|
|
|
|
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
|
|
|
|
sp->OnRead(error, arg); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
|
void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (0 == --sp->server->active_ports && sp->server->shutdown) { |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
deactivated_all_ports(sp->server); |
|
|
|
|
gpr_mu_lock(&server_->mu); |
|
|
|
|
if (0 == --server_->active_ports && server_->shutdown) { |
|
|
|
|
gpr_mu_unlock(&server_->mu); |
|
|
|
|
deactivated_all_ports(server_); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
gpr_mu_unlock(&server_->mu); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Read once. If there is more data to read, off load the work to another
|
|
|
|
|
* thread to finish. */ |
|
|
|
|
GPR_ASSERT(sp->read_cb); |
|
|
|
|
if (sp->read_cb(sp->emfd)) { |
|
|
|
|
if (udp_handler_->Read()) { |
|
|
|
|
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
|
|
|
|
|
* after finishing this event loop. */ |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, |
|
|
|
|
GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, |
|
|
|
|
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); |
|
|
|
|
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); |
|
|
|
|
GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
/* Finish reading all the packets, re-arm the notification event so we can
|
|
|
|
|
* get another chance to read. Or fd already shutdown, re-arm to get a |
|
|
|
|
* notification with shutdown error. */ |
|
|
|
|
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); |
|
|
|
|
grpc_fd_notify_on_read(emfd_, &read_closure_); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// static
|
|
|
|
|
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
|
|
|
|
|
void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { |
|
|
|
|
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); |
|
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
|
if (!sp->notify_on_write_armed) { |
|
|
|
|
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); |
|
|
|
|
sp->notify_on_write_armed = true; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) { |
|
|
|
|
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
|
|
|
|
gpr_mu_lock(sp->mutex()); |
|
|
|
|
if (!sp->notify_on_write_armed_) { |
|
|
|
|
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); |
|
|
|
|
sp->notify_on_write_armed_ = true; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(sp->mutex()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void do_write(void* arg, grpc_error* error) { |
|
|
|
|
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); |
|
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
|
if (sp->already_shutdown) { |
|
|
|
|
// static
|
|
|
|
|
void GrpcUdpListener::do_write(void* arg, grpc_error* error) { |
|
|
|
|
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
|
|
|
|
gpr_mu_lock(sp->mutex()); |
|
|
|
|
if (sp->already_shutdown_) { |
|
|
|
|
// If fd has been shutdown, don't write any more and re-arm notification.
|
|
|
|
|
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); |
|
|
|
|
grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); |
|
|
|
|
} else { |
|
|
|
|
sp->notify_on_write_armed = false; |
|
|
|
|
sp->notify_on_write_armed_ = false; |
|
|
|
|
/* Tell the registered callback that the socket is writeable. */ |
|
|
|
|
GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, |
|
|
|
|
GPR_ASSERT(error == GRPC_ERROR_NONE); |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper, |
|
|
|
|
arg, grpc_schedule_on_exec_ctx); |
|
|
|
|
sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure); |
|
|
|
|
sp->udp_handler_->OnCanWrite(sp->server_->user_data, |
|
|
|
|
&sp->notify_on_write_closure_); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
gpr_mu_unlock(sp->mutex()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_write(void* arg, grpc_error* error) { |
|
|
|
|
grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); |
|
|
|
|
// static
|
|
|
|
|
void GrpcUdpListener::on_write(void* arg, grpc_error* error) { |
|
|
|
|
GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); |
|
|
|
|
sp->OnCanWrite(error, arg); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
|
void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (0 == --sp->server->active_ports && sp->server->shutdown) { |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
deactivated_all_ports(sp->server); |
|
|
|
|
gpr_mu_lock(&server_->mu); |
|
|
|
|
if (0 == --server_->active_ports && server_->shutdown) { |
|
|
|
|
gpr_mu_unlock(&server_->mu); |
|
|
|
|
deactivated_all_ports(server_); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
gpr_mu_unlock(&server_->mu); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Schedule actual write in another thread. */ |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, |
|
|
|
|
GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, |
|
|
|
|
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE); |
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
|
GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int add_socket_to_server(grpc_udp_server* s, int fd, |
|
|
|
|
const grpc_resolved_address* addr, |
|
|
|
|
int rcv_buf_size, int snd_buf_size, |
|
|
|
|
grpc_udp_server_start_cb start_cb, |
|
|
|
|
grpc_udp_server_read_cb read_cb, |
|
|
|
|
grpc_udp_server_write_cb write_cb, |
|
|
|
|
grpc_udp_server_orphan_cb orphan_cb) { |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
int port; |
|
|
|
|
char* addr_str; |
|
|
|
|
char* name; |
|
|
|
|
int rcv_buf_size, int snd_buf_size) { |
|
|
|
|
gpr_log(GPR_DEBUG, "add socket %d to server", fd); |
|
|
|
|
|
|
|
|
|
port = |
|
|
|
|
int port = |
|
|
|
|
prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size); |
|
|
|
|
if (port >= 0) { |
|
|
|
|
grpc_sockaddr_to_string(&addr_str, addr, 1); |
|
|
|
|
gpr_asprintf(&name, "udp-server-listener:%s", addr_str); |
|
|
|
|
gpr_free(addr_str); |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
s->nports++; |
|
|
|
|
sp = static_cast<grpc_udp_listener*>(gpr_malloc(sizeof(grpc_udp_listener))); |
|
|
|
|
sp->next = nullptr; |
|
|
|
|
if (s->head == nullptr) { |
|
|
|
|
s->head = sp; |
|
|
|
|
} else { |
|
|
|
|
s->tail->next = sp; |
|
|
|
|
} |
|
|
|
|
s->tail = sp; |
|
|
|
|
sp->server = s; |
|
|
|
|
sp->fd = fd; |
|
|
|
|
sp->emfd = grpc_fd_create(fd, name); |
|
|
|
|
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); |
|
|
|
|
sp->read_cb = read_cb; |
|
|
|
|
sp->write_cb = write_cb; |
|
|
|
|
sp->orphan_cb = orphan_cb; |
|
|
|
|
sp->start_cb = start_cb; |
|
|
|
|
sp->orphan_notified = false; |
|
|
|
|
sp->already_shutdown = false; |
|
|
|
|
GPR_ASSERT(sp->emfd); |
|
|
|
|
s->listeners.emplace_back(s, fd, addr); |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"add socket %d to server for port %d, %zu listener(s) in total", fd, |
|
|
|
|
port, s->listeners.size()); |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
gpr_free(name); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return port; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_udp_server_add_port(grpc_udp_server* s, |
|
|
|
|
const grpc_resolved_address* addr, |
|
|
|
|
int rcv_buf_size, int snd_buf_size, |
|
|
|
|
grpc_udp_server_start_cb start_cb, |
|
|
|
|
grpc_udp_server_read_cb read_cb, |
|
|
|
|
grpc_udp_server_write_cb write_cb, |
|
|
|
|
grpc_udp_server_orphan_cb orphan_cb) { |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
GrpcUdpHandlerFactory* handler_factory) { |
|
|
|
|
int allocated_port1 = -1; |
|
|
|
|
int allocated_port2 = -1; |
|
|
|
|
int fd; |
|
|
|
@ -536,10 +574,10 @@ int grpc_udp_server_add_port(grpc_udp_server* s, |
|
|
|
|
/* Check if this is a wildcard port, and if so, try to keep the port the same
|
|
|
|
|
as some previously created listener. */ |
|
|
|
|
if (grpc_sockaddr_get_port(addr) == 0) { |
|
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
|
for (size_t i = 0; i < s->listeners.size(); ++i) { |
|
|
|
|
sockname_temp.len = sizeof(struct sockaddr_storage); |
|
|
|
|
if (0 == |
|
|
|
|
getsockname(sp->fd, |
|
|
|
|
getsockname(s->listeners[i].fd(), |
|
|
|
|
reinterpret_cast<struct sockaddr*>(sockname_temp.addr), |
|
|
|
|
reinterpret_cast<socklen_t*>(&sockname_temp.len))) { |
|
|
|
|
port = grpc_sockaddr_get_port(&sockname_temp); |
|
|
|
@ -559,6 +597,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, |
|
|
|
|
addr = &addr6_v4mapped; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s->handler_factory = handler_factory; |
|
|
|
|
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ |
|
|
|
|
if (grpc_sockaddr_is_wildcard(addr, &port)) { |
|
|
|
|
grpc_sockaddr_make_wildcards(port, &wild4, &wild6); |
|
|
|
@ -569,8 +608,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, |
|
|
|
|
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( |
|
|
|
|
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); |
|
|
|
|
allocated_port1 = |
|
|
|
|
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, |
|
|
|
|
read_cb, write_cb, orphan_cb); |
|
|
|
|
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); |
|
|
|
|
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
@ -593,8 +631,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, |
|
|
|
|
addr = &addr4_copy; |
|
|
|
|
} |
|
|
|
|
allocated_port2 = |
|
|
|
|
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, |
|
|
|
|
read_cb, write_cb, orphan_cb); |
|
|
|
|
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); |
|
|
|
|
|
|
|
|
|
done: |
|
|
|
|
gpr_free(allocated_addr); |
|
|
|
@ -602,52 +639,55 @@ done: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
if (port_index >= s->nports) { |
|
|
|
|
if (port_index >= s->listeners.size()) { |
|
|
|
|
return -1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (sp = s->head; sp && port_index != 0; sp = sp->next) { |
|
|
|
|
--port_index; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(sp); // if this fails, our check earlier was bogus
|
|
|
|
|
return sp->fd; |
|
|
|
|
return s->listeners[port_index].fd(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, |
|
|
|
|
size_t pollset_count, void* user_data) { |
|
|
|
|
gpr_log(GPR_DEBUG, "grpc_udp_server_start"); |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
grpc_udp_listener* sp; |
|
|
|
|
GPR_ASSERT(s->active_ports == 0); |
|
|
|
|
s->pollsets = pollsets; |
|
|
|
|
s->user_data = user_data; |
|
|
|
|
|
|
|
|
|
sp = s->head; |
|
|
|
|
while (sp != nullptr) { |
|
|
|
|
sp->start_cb(sp->emfd, sp->server->user_data); |
|
|
|
|
for (i = 0; i < pollset_count; i++) { |
|
|
|
|
grpc_pollset_add_fd(pollsets[i], sp->emfd); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); |
|
|
|
|
for (size_t i = 0; i < s->listeners.size(); ++i) { |
|
|
|
|
s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
sp->notify_on_write_armed = true; |
|
|
|
|
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Registered for both read and write callbacks: increment active_ports
|
|
|
|
|
* twice to account for this, and delay free-ing of memory until both |
|
|
|
|
* on_read and on_write have fired. */ |
|
|
|
|
s->active_ports += 2; |
|
|
|
|
void GrpcUdpListener::StartListening(grpc_pollset** pollsets, |
|
|
|
|
size_t pollset_count, |
|
|
|
|
GrpcUdpHandlerFactory* handler_factory) { |
|
|
|
|
gpr_mu_lock(&mutex_); |
|
|
|
|
handler_factory_ = handler_factory; |
|
|
|
|
udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data); |
|
|
|
|
for (size_t i = 0; i < pollset_count; i++) { |
|
|
|
|
grpc_pollset_add_fd(pollsets[i], emfd_); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_fd_notify_on_read(emfd_, &read_closure_); |
|
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx); |
|
|
|
|
notify_on_write_armed_ = true; |
|
|
|
|
grpc_fd_notify_on_write(emfd_, &write_closure_); |
|
|
|
|
|
|
|
|
|
/* Registered for both read and write callbacks: increment active_ports
|
|
|
|
|
* twice to account for this, and delay free-ing of memory until both |
|
|
|
|
* on_read and on_write have fired. */ |
|
|
|
|
server_->active_ports += 2; |
|
|
|
|
gpr_mu_unlock(&mutex_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sp = sp->next; |
|
|
|
|
void GrpcUdpListener::OnDestroy() { |
|
|
|
|
if (udp_handler_ != nullptr) { |
|
|
|
|
handler_factory_->DestroyUdpHandler(udp_handler_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#endif |
|
|
|
|