From 5eb4e1caef902d65df7c7d519b2c2890998d153f Mon Sep 17 00:00:00 2001 From: Nicolas Noble Date: Wed, 18 Nov 2015 17:19:45 -0800 Subject: [PATCH 1/4] Adding grpc_tcp_listener as an actual object being returned. --- src/core/iomgr/tcp_server.h | 20 ++-- src/core/iomgr/tcp_server_posix.c | 136 +++++++++++++++-------- src/core/iomgr/tcp_server_windows.c | 111 ++++++++++-------- src/core/security/server_secure_chttp2.c | 4 +- src/core/surface/server_chttp2.c | 4 +- test/core/util/reconnect_server.c | 4 +- 6 files changed, 177 insertions(+), 102 deletions(-) diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 882635f6388..3df36174e7b 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -39,6 +39,9 @@ /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; +/* Forward decl of grpc_tcp_listener */ +typedef struct grpc_tcp_listener grpc_tcp_listener; + /* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *ep); @@ -51,19 +54,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *cb_arg); -/* Add a port to the server, returning port number on success, or negative - on failure. +/* Add a port to the server, returning the newly created listener on success, + or a null pointer on failure. The :: and 0.0.0.0 wildcard addresses are treated identically, accepting both IPv4 and IPv6 connections, but :: is the preferred style. This usually creates one socket, but possibly two on systems which support IPv6, - but not dualstack sockets. - - For raw access to the underlying sockets, see grpc_tcp_server_get_fd(). */ + but not dualstack sockets. */ /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len); +grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, + const void *addr, + size_t addr_len); /* Returns the file descriptor of the Nth listening socket on this server, or -1 if the index is out of bounds. @@ -75,4 +77,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index); void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_closure *closure); +int grpc_tcp_listener_get_port(grpc_tcp_listener *listener); +void grpc_tcp_listener_ref(grpc_tcp_listener *listener); +void grpc_tcp_listener_unref(grpc_tcp_listener *listener); + #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 99c76dcbe9a..3dab652efa3 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -74,7 +74,7 @@ static gpr_once s_init_max_accept_queue_size; static int s_max_accept_queue_size; /* one listening port */ -typedef struct { +typedef struct server_port { int fd; grpc_fd *emfd; grpc_tcp_server *server; @@ -84,8 +84,13 @@ typedef struct { struct sockaddr_un un; } addr; size_t addr_len; + int port; grpc_closure read_closure; grpc_closure destroyed_closure; + gpr_refcount refs; + struct server_port *next; + struct server_port *dual_stack_second_port; + int is_dual_stack_second_port; } server_port; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { @@ -112,10 +117,9 @@ struct grpc_tcp_server { /* is this server shutting down? (boolean) */ int shutdown; - /* all listening ports */ - server_port *ports; - size_t nports; - size_t port_capacity; + /* linked list of server ports */ + server_port *head; + unsigned nports; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -134,18 +138,22 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->shutdown = 0; s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; - s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->head = NULL; s->nports = 0; - s->port_capacity = INIT_PORT_CAP; return s; } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + server_port *sp; + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); - gpr_free(s->ports); + for (sp = s->head; sp; sp = sp->next) { + grpc_tcp_listener_unref((grpc_tcp_listener *)sp); + } + gpr_free(s); } @@ -166,8 +174,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) { events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - size_t i; - /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -176,9 +182,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { return; } - if (s->nports) { - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; + if (s->head) { + server_port *sp; + for (sp = s->head; sp; sp = sp->next) { if (sp->addr.sockaddr.sa_family == AF_UNIX) { unlink_if_unix_domain_socket(&sp->addr.un); } @@ -196,7 +202,6 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_closure *closure) { - size_t i; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -206,8 +211,9 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, /* shutdown all fd's */ if (s->active_ports) { - for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); + server_port *sp; + for (sp = s->head; sp; sp = sp->next) { + grpc_fd_shutdown(exec_ctx, sp->emfd); } gpr_mu_unlock(&s->mu); } else { @@ -364,9 +370,10 @@ error: } } -static int add_socket_to_server(grpc_tcp_server *s, int fd, - const struct sockaddr *addr, size_t addr_len) { - server_port *sp; +static server_port *add_socket_to_server(grpc_tcp_server *s, int fd, + const struct sockaddr *addr, + size_t addr_len) { + server_port *sp = NULL; int port; char *addr_str; char *name; @@ -376,32 +383,35 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); + s->nports++; GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - /* append it to the list under a lock */ - if (s->nports == s->port_capacity) { - s->port_capacity *= 2; - s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); - } - sp = &s->ports[s->nports++]; + sp = gpr_malloc(sizeof(server_port)); + sp->next = s->head; + s->head = sp; sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; + sp->port = port; + sp->is_dual_stack_second_port = 0; + sp->dual_stack_second_port = NULL; + gpr_ref_init(&sp->refs, 1); GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(addr_str); gpr_free(name); } - return port; + return sp; } -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len) { - int allocated_port1 = -1; - int allocated_port2 = -1; - unsigned i; +grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, + const void *addr, + size_t addr_len) { + int allocated_port = -1; + server_port *sp; + server_port *sp2 = NULL; int fd; grpc_dualstack_mode dsmode; struct sockaddr_in6 addr6_v4mapped; @@ -420,9 +430,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (i = 0; i < s->nports; i++) { + for (sp = s->head; sp; sp = sp->next) { sockname_len = sizeof(sockname_temp); - if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, + if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp, &sockname_len)) { port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); if (port > 0) { @@ -436,6 +446,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, } } + sp = NULL; + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { addr = (const struct sockaddr *)&addr6_v4mapped; addr_len = sizeof(addr6_v4mapped); @@ -449,14 +461,16 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - allocated_port1 = add_socket_to_server(s, fd, addr, addr_len); + sp = add_socket_to_server(s, fd, addr, addr_len); + allocated_port = sp->port; if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ - if (port == 0 && allocated_port1 > 0) { - grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); + if (port == 0 && allocated_port > 0) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port); + sp2 = sp; } addr = (struct sockaddr *)&wild4; addr_len = sizeof(wild4); @@ -471,22 +485,31 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } - allocated_port2 = add_socket_to_server(s, fd, addr, addr_len); + sp = add_socket_to_server(s, fd, addr, addr_len); + sp->dual_stack_second_port = sp2; + if (sp2) sp2->is_dual_stack_second_port = 1; done: gpr_free(allocated_addr); - return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; + return (grpc_tcp_listener *)sp; } int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { - return (port_index < s->nports) ? s->ports[port_index].fd : -1; + server_port *sp; + for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--); + if (port_index == 0 && sp) { + return sp->fd; + } else { + return -1; + } } void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *on_accept_cb_arg) { - size_t i, j; + size_t i; + server_port *sp; GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb); @@ -495,17 +518,36 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, s->on_accept_cb_arg = on_accept_cb_arg; s->pollsets = pollsets; s->pollset_count = pollset_count; - for (i = 0; i < s->nports; i++) { - for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); + for (sp = s->head; sp; sp = sp->next) { + for (i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - s->ports[i].read_closure.cb = on_read; - s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, - &s->ports[i].read_closure); + sp->read_closure.cb = on_read; + sp->read_closure.cb_arg = sp; + grpc_fd_notify_on_read(exec_ctx, sp->emfd, + &sp->read_closure); s->active_ports++; } gpr_mu_unlock(&s->mu); } +int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { + server_port *sp = (server_port *)listener; + return sp->port; +} + +void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { + server_port *sp = (server_port *)listener; + gpr_ref(&sp->refs); +} + +void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { + server_port *sp = (server_port *)listener; + if (sp->is_dual_stack_second_port) return; + if (gpr_unref(&sp->refs)) { + if (sp->dual_stack_second_port) gpr_free(sp->dual_stack_second_port); + gpr_free(listener); + } +} + #endif diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 3fea8b5b350..c7a9fffa101 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -35,7 +35,8 @@ #ifdef GPR_WINSOCK_SOCKET -#define _GNU_SOURCE +#include + #include "src/core/iomgr/sockaddr_utils.h" #include @@ -51,7 +52,6 @@ #include "src/core/iomgr/tcp_server.h" #include "src/core/iomgr/tcp_windows.h" -#define INIT_PORT_CAP 2 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 /* one listening port */ @@ -61,14 +61,19 @@ typedef struct server_port { gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2]; /* This will hold the socket for the next accept. */ SOCKET new_socket; - /* The listener winsocked. */ + /* The listener winsocket. */ grpc_winsocket *socket; + /* The actual TCP port number. */ + int port; grpc_tcp_server *server; /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; int shutting_down; /* closure for socket notification of accept being ready */ grpc_closure on_accept; + gpr_refcount refs; + /* linked list */ + struct server_port *next; } server_port; /* the overall server */ @@ -82,10 +87,8 @@ struct grpc_tcp_server { /* active port count: how many ports are actually still listening */ int active_ports; - /* all listening ports */ - server_port *ports; - size_t nports; - size_t port_capacity; + /* linked list of server ports */ + server_port *head; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -99,9 +102,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->active_ports = 0; s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; - s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); - s->nports = 0; - s->port_capacity = INIT_PORT_CAP; + s->head = NULL; s->shutdown_complete = NULL; return s; } @@ -109,26 +110,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) { static void dont_care_about_shutdown_completion(void *arg) {} static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - size_t i; - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); /* Now that the accepts have been aborted, we can destroy the sockets. The IOCP won't get notified on these, so we can flag them as already closed by the system. */ - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; + while (s->head) { + server_port *sp = s->head; + s->head = sp->next; + sp->next = NULL; grpc_winsocket_destroy(sp->socket); + grpc_tcp_listener_unref((grpc_tcp_listener *) sp); } - gpr_free(s->ports); gpr_free(s); } /* Public function. Stops and destroys a grpc_tcp_server. */ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_closure *shutdown_complete) { - size_t i; int immediately_done = 0; + server_port *sp; gpr_mu_lock(&s->mu); s->shutdown_complete = shutdown_complete; @@ -138,8 +139,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, if (s->active_ports == 0) { immediately_done = 1; } - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; + for (sp = s->head; sp; sp = sp->next) { sp->shutting_down = 1; grpc_winsocket_shutdown(sp->socket); } @@ -351,16 +351,17 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { start_accept(exec_ctx, sp); } -static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, size_t addr_len) { - server_port *sp; +static server_port *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, + const struct sockaddr *addr, + size_t addr_len) { + server_port *sp = NULL; int port; int status; GUID guid = WSAID_ACCEPTEX; DWORD ioctl_num_bytes; LPFN_ACCEPTEX AcceptEx; - if (sock == INVALID_SOCKET) return -1; + if (sock == INVALID_SOCKET) return NULL; /* We need to grab the AcceptEx pointer for that port, as it may be interface-dependent. We'll cache it to avoid doing that again. */ @@ -373,37 +374,35 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_free(utf8_message); closesocket(sock); - return -1; + return NULL; } port = prepare_socket(sock, addr, addr_len); if (port >= 0) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - /* append it to the list under a lock */ - if (s->nports == s->port_capacity) { - /* too many ports, and we need to store their address in a closure */ - /* TODO(ctiller): make server_port a linked list */ - abort(); - } - sp = &s->ports[s->nports++]; + sp = gpr_malloc(sizeof(server_port)); + sp->next = s->head; + s->head = sp; sp->server = s; sp->socket = grpc_winsocket_create(sock, "listener"); sp->shutting_down = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; + sp->port = port; + gpr_ref_init(&sp->refs, 1); grpc_closure_init(&sp->on_accept, on_accept, sp); GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); } - return port; + return sp; } -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len) { - int allocated_port = -1; - unsigned i; +grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, + const void *addr, + size_t addr_len) { + server_port *sp; SOCKET sock; struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 wildcard; @@ -415,9 +414,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (i = 0; i < s->nports; i++) { + for (sp = s->head; sp; sp = sp->next) { sockname_len = sizeof(sockname_temp); - if (0 == getsockname(s->ports[i].socket->socket, + if (0 == getsockname(sp->socket->socket, (struct sockaddr *)&sockname_temp, &sockname_len)) { port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); if (port > 0) { @@ -452,33 +451,55 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, gpr_free(utf8_message); } - allocated_port = add_socket_to_server(s, sock, addr, addr_len); + sp = add_socket_to_server(s, sock, addr, addr_len); gpr_free(allocated_addr); - return allocated_port; + return (grpc_tcp_listener *)sp; } -SOCKET -grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { - return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET; +int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { + server_port *sp; + for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--); + if (port_index == 0 && sp) { + return _open_osfhandle(sp->socket->socket, 0); + } else { + return -1; + } } void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_pollset **pollset, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *on_accept_cb_arg) { - size_t i; + server_port *sp; GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); s->on_accept_cb = on_accept_cb; s->on_accept_cb_arg = on_accept_cb_arg; - for (i = 0; i < s->nports; i++) { - start_accept(exec_ctx, s->ports + i); + for (sp = s->head; sp; sp = sp->next) { + start_accept(exec_ctx, sp); s->active_ports++; } gpr_mu_unlock(&s->mu); } +int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { + server_port *sp = (server_port *)listener; + return sp->port; +} + +void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { + server_port *sp = (server_port *)listener; + gpr_ref(&sp->refs); +} + +void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { + server_port *sp = (server_port *)listener; + if (gpr_unref(&sp->refs)) { + gpr_free(listener); + } +} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 851e0cfab31..1ea269bf8f0 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -249,9 +249,11 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, } for (i = 0; i < resolved->naddrs; i++) { - port_temp = grpc_tcp_server_add_port( + grpc_tcp_listener *listener; + listener = grpc_tcp_server_add_port( tcp, (struct sockaddr *)&resolved->addrs[i].addr, resolved->addrs[i].len); + port_temp = grpc_tcp_listener_get_port(listener); if (port_temp >= 0) { if (port_num == -1) { port_num = port_temp; diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 580b91573c2..1408f9c1592 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -107,9 +107,11 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { } for (i = 0; i < resolved->naddrs; i++) { - port_temp = grpc_tcp_server_add_port( + grpc_tcp_listener *listener; + listener = grpc_tcp_server_add_port( tcp, (struct sockaddr *)&resolved->addrs[i].addr, resolved->addrs[i].len); + port_temp = grpc_tcp_listener_get_port(listener); if (port_temp >= 0) { if (port_num == -1) { port_num = port_temp; diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index ee481ef6749..c064fb32c6e 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -113,6 +113,7 @@ void reconnect_server_init(reconnect_server *server) { void reconnect_server_start(reconnect_server *server, int port) { struct sockaddr_in addr; + grpc_tcp_listener *listener; int port_added; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -121,8 +122,9 @@ void reconnect_server_start(reconnect_server *server, int port) { memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); server->tcp_server = grpc_tcp_server_create(); - port_added = + listener = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); + port_added = grpc_tcp_listener_get_port(listener); GPR_ASSERT(port_added == port); grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, From 8f714620b21d6c25b9d67d6d36b1e216dc8cbe94 Mon Sep 17 00:00:00 2001 From: Nicolas Noble Date: Thu, 19 Nov 2015 11:16:54 -0800 Subject: [PATCH 2/4] Renaming server_port to grpc_tcp_listener - there's no need for casting... --- src/core/iomgr/tcp_server_posix.c | 42 ++++++++++++++--------------- src/core/iomgr/tcp_server_windows.c | 40 +++++++++++++-------------- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 3dab652efa3..fbc0e974e67 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -74,7 +74,7 @@ static gpr_once s_init_max_accept_queue_size; static int s_max_accept_queue_size; /* one listening port */ -typedef struct server_port { +struct grpc_tcp_listener { int fd; grpc_fd *emfd; grpc_tcp_server *server; @@ -88,10 +88,10 @@ typedef struct server_port { grpc_closure read_closure; grpc_closure destroyed_closure; gpr_refcount refs; - struct server_port *next; - struct server_port *dual_stack_second_port; + struct grpc_tcp_listener *next; + struct grpc_tcp_listener *dual_stack_second_port; int is_dual_stack_second_port; -} server_port; +}; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { struct stat st; @@ -118,7 +118,7 @@ struct grpc_tcp_server { int shutdown; /* linked list of server ports */ - server_port *head; + grpc_tcp_listener *head; unsigned nports; /* shutdown callback */ @@ -144,14 +144,14 @@ grpc_tcp_server *grpc_tcp_server_create(void) { } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - server_port *sp; + grpc_tcp_listener *sp; grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); for (sp = s->head; sp; sp = sp->next) { - grpc_tcp_listener_unref((grpc_tcp_listener *)sp); + grpc_tcp_listener_unref(sp); } gpr_free(s); @@ -183,7 +183,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } if (s->head) { - server_port *sp; + grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { if (sp->addr.sockaddr.sa_family == AF_UNIX) { unlink_if_unix_domain_socket(&sp->addr.un); @@ -211,7 +211,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, /* shutdown all fd's */ if (s->active_ports) { - server_port *sp; + grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { grpc_fd_shutdown(exec_ctx, sp->emfd); } @@ -304,7 +304,7 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { - server_port *sp = arg; + grpc_tcp_listener *sp = arg; grpc_fd *fdobj; size_t i; @@ -370,10 +370,10 @@ error: } } -static server_port *add_socket_to_server(grpc_tcp_server *s, int fd, +static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, const struct sockaddr *addr, size_t addr_len) { - server_port *sp = NULL; + grpc_tcp_listener *sp = NULL; int port; char *addr_str; char *name; @@ -385,7 +385,7 @@ static server_port *add_socket_to_server(grpc_tcp_server *s, int fd, gpr_mu_lock(&s->mu); s->nports++; GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(server_port)); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = s->head; s->head = sp; sp->server = s; @@ -410,8 +410,8 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, size_t addr_len) { int allocated_port = -1; - server_port *sp; - server_port *sp2 = NULL; + grpc_tcp_listener *sp; + grpc_tcp_listener *sp2 = NULL; int fd; grpc_dualstack_mode dsmode; struct sockaddr_in6 addr6_v4mapped; @@ -491,11 +491,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, done: gpr_free(allocated_addr); - return (grpc_tcp_listener *)sp; + return sp; } int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { - server_port *sp; + grpc_tcp_listener *sp; for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--); if (port_index == 0 && sp) { return sp->fd; @@ -509,7 +509,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_tcp_server_cb on_accept_cb, void *on_accept_cb_arg) { size_t i; - server_port *sp; + grpc_tcp_listener *sp; GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb); @@ -532,17 +532,17 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, } int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { - server_port *sp = (server_port *)listener; + grpc_tcp_listener *sp = listener; return sp->port; } void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { - server_port *sp = (server_port *)listener; + grpc_tcp_listener *sp = listener; gpr_ref(&sp->refs); } void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { - server_port *sp = (server_port *)listener; + grpc_tcp_listener *sp = listener; if (sp->is_dual_stack_second_port) return; if (gpr_unref(&sp->refs)) { if (sp->dual_stack_second_port) gpr_free(sp->dual_stack_second_port); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index c7a9fffa101..e4a1d7f498c 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -55,7 +55,7 @@ #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 /* one listening port */ -typedef struct server_port { +struct grpc_tcp_listener { /* This seemingly magic number comes from AcceptEx's documentation. each address buffer needs to have at least 16 more bytes at their end. */ gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2]; @@ -73,8 +73,8 @@ typedef struct server_port { grpc_closure on_accept; gpr_refcount refs; /* linked list */ - struct server_port *next; -} server_port; + struct grpc_tcp_listener *next; +}; /* the overall server */ struct grpc_tcp_server { @@ -88,7 +88,7 @@ struct grpc_tcp_server { int active_ports; /* linked list of server ports */ - server_port *head; + grpc_tcp_listener *head; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -116,11 +116,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { The IOCP won't get notified on these, so we can flag them as already closed by the system. */ while (s->head) { - server_port *sp = s->head; + grpc_tcp_listener *sp = s->head; s->head = sp->next; sp->next = NULL; grpc_winsocket_destroy(sp->socket); - grpc_tcp_listener_unref((grpc_tcp_listener *) sp); + grpc_tcp_listener_unref(sp); } gpr_free(s); } @@ -129,7 +129,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_closure *shutdown_complete) { int immediately_done = 0; - server_port *sp; + grpc_tcp_listener *sp; gpr_mu_lock(&s->mu); s->shutdown_complete = shutdown_complete; @@ -199,7 +199,7 @@ error: } static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, - server_port *sp) { + grpc_tcp_listener *sp) { int notify = 0; sp->shutting_down = 0; gpr_mu_lock(&sp->server->mu); @@ -216,7 +216,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, /* In order to do an async accept, we need to create a socket first which will be the one assigned to the new incoming connection. */ -static void start_accept(grpc_exec_ctx *exec_ctx, server_port *port) { +static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) { SOCKET sock = INVALID_SOCKET; char *message; char *utf8_message; @@ -276,7 +276,7 @@ failure: /* Event manager callback when reads are ready. */ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { - server_port *sp = arg; + grpc_tcp_listener *sp = arg; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; @@ -351,10 +351,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { start_accept(exec_ctx, sp); } -static server_port *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, +static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, const struct sockaddr *addr, size_t addr_len) { - server_port *sp = NULL; + grpc_tcp_listener *sp = NULL; int port; int status; GUID guid = WSAID_ACCEPTEX; @@ -381,7 +381,7 @@ static server_port *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, if (port >= 0) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(server_port)); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = s->head; s->head = sp; sp->server = s; @@ -402,7 +402,7 @@ static server_port *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, size_t addr_len) { - server_port *sp; + grpc_tcp_listener *sp; SOCKET sock; struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 wildcard; @@ -454,11 +454,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, sp = add_socket_to_server(s, sock, addr, addr_len); gpr_free(allocated_addr); - return (grpc_tcp_listener *)sp; + return sp; } int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { - server_port *sp; + grpc_tcp_listener *sp; for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--); if (port_index == 0 && sp) { return _open_osfhandle(sp->socket->socket, 0); @@ -471,7 +471,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, grpc_pollset **pollset, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *on_accept_cb_arg) { - server_port *sp; + grpc_tcp_listener *sp; GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb); @@ -486,17 +486,17 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, } int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { - server_port *sp = (server_port *)listener; + grpc_tcp_listener *sp = listener; return sp->port; } void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { - server_port *sp = (server_port *)listener; + grpc_tcp_listener *sp = listener; gpr_ref(&sp->refs); } void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { - server_port *sp = (server_port *)listener; + grpc_tcp_listener *sp = listener; if (gpr_unref(&sp->refs)) { gpr_free(listener); } From c6a7c6e02cdf52c0ff99f4451923e3858db74ed4 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Thu, 19 Nov 2015 21:55:44 +0100 Subject: [PATCH 3/4] Fixing use after free under Posix (was already fixed for Windows.) --- src/core/iomgr/tcp_server_posix.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index fbc0e974e67..5daf406c99d 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -144,13 +144,13 @@ grpc_tcp_server *grpc_tcp_server_create(void) { } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - grpc_tcp_listener *sp; - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); - for (sp = s->head; sp; sp = sp->next) { + while (s->head) { + grpc_tcp_listener *sp = s->head; + s->head = sp->next; grpc_tcp_listener_unref(sp); } From d86115e2f6f1797adaa9a2ab8864f1feb9615dfa Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Fri, 20 Nov 2015 05:56:25 +0100 Subject: [PATCH 4/4] Renaming dual_stack_second_port to sibling. --- src/core/iomgr/tcp_server_posix.c | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5daf406c99d..1439dfcd6ed 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -67,7 +67,6 @@ #include #include -#define INIT_PORT_CAP 2 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 static gpr_once s_init_max_accept_queue_size; @@ -89,8 +88,12 @@ struct grpc_tcp_listener { grpc_closure destroyed_closure; gpr_refcount refs; struct grpc_tcp_listener *next; - struct grpc_tcp_listener *dual_stack_second_port; - int is_dual_stack_second_port; + /* When we add a listener, more than one can be created, mainly because of + IPv6. A sibling will still be in the normal list, but will be flagged + as such. Any action, such as ref or unref, will affect all of the + siblings in the list. */ + struct grpc_tcp_listener *sibling; + int is_sibling; }; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { @@ -394,8 +397,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->port = port; - sp->is_dual_stack_second_port = 0; - sp->dual_stack_second_port = NULL; + sp->is_sibling = 0; + sp->sibling = NULL; gpr_ref_init(&sp->refs, 1); GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); @@ -486,8 +489,8 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, addr_len = sizeof(addr4_copy); } sp = add_socket_to_server(s, fd, addr, addr_len); - sp->dual_stack_second_port = sp2; - if (sp2) sp2->is_dual_stack_second_port = 1; + sp->sibling = sp2; + if (sp2) sp2->is_sibling = 1; done: gpr_free(allocated_addr); @@ -543,9 +546,14 @@ void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { grpc_tcp_listener *sp = listener; - if (sp->is_dual_stack_second_port) return; + if (sp->is_sibling) return; if (gpr_unref(&sp->refs)) { - if (sp->dual_stack_second_port) gpr_free(sp->dual_stack_second_port); + grpc_tcp_listener *sibling = sp->sibling; + while (sibling) { + sp = sibling; + sibling = sp->sibling; + gpr_free(sp); + } gpr_free(listener); } }