|
|
@ -64,12 +64,12 @@ |
|
|
|
#include "src/core/lib/iomgr/resolve_address.h" |
|
|
|
#include "src/core/lib/iomgr/resolve_address.h" |
|
|
|
#include "src/core/lib/iomgr/sockaddr_utils.h" |
|
|
|
#include "src/core/lib/iomgr/sockaddr_utils.h" |
|
|
|
#include "src/core/lib/iomgr/socket_utils_posix.h" |
|
|
|
#include "src/core/lib/iomgr/socket_utils_posix.h" |
|
|
|
|
|
|
|
#include "src/core/lib/iomgr/unix_sockets_posix.h" |
|
|
|
#include "src/core/lib/support/string.h" |
|
|
|
#include "src/core/lib/support/string.h" |
|
|
|
|
|
|
|
|
|
|
|
#define INIT_PORT_CAP 2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* one listening port */ |
|
|
|
/* one listening port */ |
|
|
|
typedef struct { |
|
|
|
typedef struct grpc_udp_listener grpc_udp_listener; |
|
|
|
|
|
|
|
struct grpc_udp_listener { |
|
|
|
int fd; |
|
|
|
int fd; |
|
|
|
grpc_fd *emfd; |
|
|
|
grpc_fd *emfd; |
|
|
|
grpc_udp_server *server; |
|
|
|
grpc_udp_server *server; |
|
|
@ -82,12 +82,13 @@ typedef struct { |
|
|
|
grpc_closure destroyed_closure; |
|
|
|
grpc_closure destroyed_closure; |
|
|
|
grpc_udp_server_read_cb read_cb; |
|
|
|
grpc_udp_server_read_cb read_cb; |
|
|
|
grpc_udp_server_orphan_cb orphan_cb; |
|
|
|
grpc_udp_server_orphan_cb orphan_cb; |
|
|
|
} server_port; |
|
|
|
|
|
|
|
|
|
|
|
struct grpc_udp_listener *next; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
/* the overall server */ |
|
|
|
/* the overall server */ |
|
|
|
struct grpc_udp_server { |
|
|
|
struct grpc_udp_server { |
|
|
|
gpr_mu mu; |
|
|
|
gpr_mu mu; |
|
|
|
gpr_cv cv; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* active port count: how many ports are actually still listening */ |
|
|
|
/* active port count: how many ports are actually still listening */ |
|
|
|
size_t active_ports; |
|
|
|
size_t active_ports; |
|
|
@ -97,10 +98,10 @@ struct grpc_udp_server { |
|
|
|
/* is this server shutting down? (boolean) */ |
|
|
|
/* is this server shutting down? (boolean) */ |
|
|
|
int shutdown; |
|
|
|
int shutdown; |
|
|
|
|
|
|
|
|
|
|
|
/* all listening ports */ |
|
|
|
/* linked list of server ports */ |
|
|
|
server_port *ports; |
|
|
|
grpc_udp_listener *head; |
|
|
|
size_t nports; |
|
|
|
grpc_udp_listener *tail; |
|
|
|
size_t port_capacity; |
|
|
|
unsigned nports; |
|
|
|
|
|
|
|
|
|
|
|
/* shutdown callback */ |
|
|
|
/* shutdown callback */ |
|
|
|
grpc_closure *shutdown_complete; |
|
|
|
grpc_closure *shutdown_complete; |
|
|
@ -116,24 +117,29 @@ struct grpc_udp_server { |
|
|
|
grpc_udp_server *grpc_udp_server_create(void) { |
|
|
|
grpc_udp_server *grpc_udp_server_create(void) { |
|
|
|
grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); |
|
|
|
grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); |
|
|
|
gpr_mu_init(&s->mu); |
|
|
|
gpr_mu_init(&s->mu); |
|
|
|
gpr_cv_init(&s->cv); |
|
|
|
|
|
|
|
s->active_ports = 0; |
|
|
|
s->active_ports = 0; |
|
|
|
s->destroyed_ports = 0; |
|
|
|
s->destroyed_ports = 0; |
|
|
|
s->shutdown = 0; |
|
|
|
s->shutdown = 0; |
|
|
|
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); |
|
|
|
s->head = NULL; |
|
|
|
|
|
|
|
s->tail = NULL; |
|
|
|
s->nports = 0; |
|
|
|
s->nports = 0; |
|
|
|
s->port_capacity = INIT_PORT_CAP; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return s; |
|
|
|
return s; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { |
|
|
|
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { |
|
|
|
grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); |
|
|
|
if (s->shutdown_complete != NULL) { |
|
|
|
|
|
|
|
grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_destroy(&s->mu); |
|
|
|
gpr_mu_destroy(&s->mu); |
|
|
|
gpr_cv_destroy(&s->cv); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_free(s->ports); |
|
|
|
while (s->head) { |
|
|
|
|
|
|
|
grpc_udp_listener *sp = s->head; |
|
|
|
|
|
|
|
s->head = sp->next; |
|
|
|
|
|
|
|
gpr_free(sp); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
gpr_free(s); |
|
|
|
gpr_free(s); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -154,8 +160,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, |
|
|
|
events will be received on them - at this point it's safe to destroy |
|
|
|
events will be received on them - at this point it's safe to destroy |
|
|
|
things */ |
|
|
|
things */ |
|
|
|
static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { |
|
|
|
static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { |
|
|
|
size_t i; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* delete ALL the things */ |
|
|
|
/* delete ALL the things */ |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
|
|
|
|
|
|
@ -164,9 +168,11 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (s->nports) { |
|
|
|
if (s->head) { |
|
|
|
for (i = 0; i < s->nports; i++) { |
|
|
|
grpc_udp_listener *sp; |
|
|
|
server_port *sp = &s->ports[i]; |
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
|
|
|
|
grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); |
|
|
|
|
|
|
|
|
|
|
|
sp->destroyed_closure.cb = destroyed_port; |
|
|
|
sp->destroyed_closure.cb = destroyed_port; |
|
|
|
sp->destroyed_closure.cb_arg = s; |
|
|
|
sp->destroyed_closure.cb_arg = s; |
|
|
|
|
|
|
|
|
|
|
@ -187,7 +193,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { |
|
|
|
|
|
|
|
|
|
|
|
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, |
|
|
|
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, |
|
|
|
grpc_closure *on_done) { |
|
|
|
grpc_closure *on_done) { |
|
|
|
size_t i; |
|
|
|
grpc_udp_listener *sp; |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
|
|
|
|
|
|
|
GPR_ASSERT(!s->shutdown); |
|
|
|
GPR_ASSERT(!s->shutdown); |
|
|
@ -197,14 +203,10 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, |
|
|
|
|
|
|
|
|
|
|
|
/* shutdown all fd's */ |
|
|
|
/* shutdown all fd's */ |
|
|
|
if (s->active_ports) { |
|
|
|
if (s->active_ports) { |
|
|
|
for (i = 0; i < s->nports; i++) { |
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
server_port *sp = &s->ports[i]; |
|
|
|
|
|
|
|
/* Call the orphan_cb to signal that the FD is about to be closed and
|
|
|
|
|
|
|
|
* should no longer be used. */ |
|
|
|
|
|
|
|
GPR_ASSERT(sp->orphan_cb); |
|
|
|
GPR_ASSERT(sp->orphan_cb); |
|
|
|
sp->orphan_cb(sp->emfd); |
|
|
|
sp->orphan_cb(sp->emfd); |
|
|
|
|
|
|
|
grpc_fd_shutdown(exec_ctx, sp->emfd); |
|
|
|
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
} else { |
|
|
|
} else { |
|
|
@ -281,10 +283,10 @@ error: |
|
|
|
|
|
|
|
|
|
|
|
/* event manager callback when reads are ready */ |
|
|
|
/* event manager callback when reads are ready */ |
|
|
|
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
server_port *sp = arg; |
|
|
|
grpc_udp_listener *sp = arg; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
gpr_mu_lock(&sp->server->mu); |
|
|
|
|
|
|
|
if (0 == --sp->server->active_ports) { |
|
|
|
if (0 == --sp->server->active_ports) { |
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
deactivated_all_ports(exec_ctx, sp->server); |
|
|
|
deactivated_all_ports(exec_ctx, sp->server); |
|
|
@ -300,13 +302,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
|
|
|
|
|
|
|
/* Re-arm the notification event so we get another chance to read. */ |
|
|
|
/* Re-arm the notification event so we get another chance to read. */ |
|
|
|
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); |
|
|
|
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); |
|
|
|
|
|
|
|
gpr_mu_unlock(&sp->server->mu); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static int add_socket_to_server(grpc_udp_server *s, int fd, |
|
|
|
static int add_socket_to_server(grpc_udp_server *s, int fd, |
|
|
|
const struct sockaddr *addr, size_t addr_len, |
|
|
|
const struct sockaddr *addr, size_t addr_len, |
|
|
|
grpc_udp_server_read_cb read_cb, |
|
|
|
grpc_udp_server_read_cb read_cb, |
|
|
|
grpc_udp_server_orphan_cb orphan_cb) { |
|
|
|
grpc_udp_server_orphan_cb orphan_cb) { |
|
|
|
server_port *sp; |
|
|
|
grpc_udp_listener *sp; |
|
|
|
int port; |
|
|
|
int port; |
|
|
|
char *addr_str; |
|
|
|
char *addr_str; |
|
|
|
char *name; |
|
|
|
char *name; |
|
|
@ -317,12 +320,15 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, |
|
|
|
gpr_asprintf(&name, "udp-server-listener:%s", addr_str); |
|
|
|
gpr_asprintf(&name, "udp-server-listener:%s", addr_str); |
|
|
|
gpr_free(addr_str); |
|
|
|
gpr_free(addr_str); |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
/* append it to the list under a lock */ |
|
|
|
s->nports++; |
|
|
|
if (s->nports == s->port_capacity) { |
|
|
|
sp = gpr_malloc(sizeof(grpc_udp_listener)); |
|
|
|
s->port_capacity *= 2; |
|
|
|
sp->next = NULL; |
|
|
|
s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); |
|
|
|
if (s->head == NULL) { |
|
|
|
|
|
|
|
s->head = sp; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
s->tail->next = sp; |
|
|
|
} |
|
|
|
} |
|
|
|
sp = &s->ports[s->nports++]; |
|
|
|
s->tail = sp; |
|
|
|
sp->server = s; |
|
|
|
sp->server = s; |
|
|
|
sp->fd = fd; |
|
|
|
sp->fd = fd; |
|
|
|
sp->emfd = grpc_fd_create(fd, name); |
|
|
|
sp->emfd = grpc_fd_create(fd, name); |
|
|
@ -341,9 +347,9 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, |
|
|
|
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, |
|
|
|
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, |
|
|
|
size_t addr_len, grpc_udp_server_read_cb read_cb, |
|
|
|
size_t addr_len, grpc_udp_server_read_cb read_cb, |
|
|
|
grpc_udp_server_orphan_cb orphan_cb) { |
|
|
|
grpc_udp_server_orphan_cb orphan_cb) { |
|
|
|
|
|
|
|
grpc_udp_listener *sp; |
|
|
|
int allocated_port1 = -1; |
|
|
|
int allocated_port1 = -1; |
|
|
|
int allocated_port2 = -1; |
|
|
|
int allocated_port2 = -1; |
|
|
|
unsigned i; |
|
|
|
|
|
|
|
int fd; |
|
|
|
int fd; |
|
|
|
grpc_dualstack_mode dsmode; |
|
|
|
grpc_dualstack_mode dsmode; |
|
|
|
struct sockaddr_in6 addr6_v4mapped; |
|
|
|
struct sockaddr_in6 addr6_v4mapped; |
|
|
@ -358,9 +364,9 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, |
|
|
|
/* Check if this is a wildcard port, and if so, try to keep the port the same
|
|
|
|
/* Check if this is a wildcard port, and if so, try to keep the port the same
|
|
|
|
as some previously created listener. */ |
|
|
|
as some previously created listener. */ |
|
|
|
if (grpc_sockaddr_get_port(addr) == 0) { |
|
|
|
if (grpc_sockaddr_get_port(addr) == 0) { |
|
|
|
for (i = 0; i < s->nports; i++) { |
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
sockname_len = sizeof(sockname_temp); |
|
|
|
sockname_len = sizeof(sockname_temp); |
|
|
|
if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, |
|
|
|
if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp, |
|
|
|
&sockname_len)) { |
|
|
|
&sockname_len)) { |
|
|
|
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); |
|
|
|
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); |
|
|
|
if (port > 0) { |
|
|
|
if (port > 0) { |
|
|
@ -421,27 +427,40 @@ done: |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { |
|
|
|
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { |
|
|
|
return (port_index < s->nports) ? s->ports[port_index].fd : -1; |
|
|
|
grpc_udp_listener *sp; |
|
|
|
|
|
|
|
if (port_index >= s->nports) { |
|
|
|
|
|
|
|
return -1; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (sp = s->head; sp && port_index != 0; sp = sp->next) { |
|
|
|
|
|
|
|
--port_index; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return sp->fd; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, |
|
|
|
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, |
|
|
|
grpc_pollset **pollsets, size_t pollset_count, |
|
|
|
grpc_pollset **pollsets, size_t pollset_count, |
|
|
|
grpc_server *server) { |
|
|
|
grpc_server *server) { |
|
|
|
size_t i, j; |
|
|
|
size_t i; |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
|
|
|
grpc_udp_listener *sp; |
|
|
|
GPR_ASSERT(s->active_ports == 0); |
|
|
|
GPR_ASSERT(s->active_ports == 0); |
|
|
|
s->pollsets = pollsets; |
|
|
|
s->pollsets = pollsets; |
|
|
|
s->grpc_server = server; |
|
|
|
s->grpc_server = server; |
|
|
|
for (i = 0; i < s->nports; i++) { |
|
|
|
|
|
|
|
for (j = 0; j < pollset_count; j++) { |
|
|
|
sp = s->head; |
|
|
|
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); |
|
|
|
while (sp != NULL) { |
|
|
|
|
|
|
|
for (i = 0; i < pollset_count; i++) { |
|
|
|
|
|
|
|
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); |
|
|
|
} |
|
|
|
} |
|
|
|
s->ports[i].read_closure.cb = on_read; |
|
|
|
sp->read_closure.cb = on_read; |
|
|
|
s->ports[i].read_closure.cb_arg = &s->ports[i]; |
|
|
|
sp->read_closure.cb_arg = sp; |
|
|
|
grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, |
|
|
|
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); |
|
|
|
&s->ports[i].read_closure); |
|
|
|
|
|
|
|
s->active_ports++; |
|
|
|
s->active_ports++; |
|
|
|
|
|
|
|
sp = sp->next; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|