Adding grpc_tcp_listener as an actual object being returned.

pull/4157/head
Nicolas Noble 9 years ago committed by Nicolas "Pixel" Noble
parent 75b53d6a5d
commit 5eb4e1caef
  1. 20
      src/core/iomgr/tcp_server.h
  2. 136
      src/core/iomgr/tcp_server_posix.c
  3. 111
      src/core/iomgr/tcp_server_windows.c
  4. 4
      src/core/security/server_secure_chttp2.c
  5. 4
      src/core/surface/server_chttp2.c
  6. 4
      test/core/util/reconnect_server.c

@ -39,6 +39,9 @@
/* Forward decl of grpc_tcp_server */ /* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server; typedef struct grpc_tcp_server grpc_tcp_server;
/* Forward decl of grpc_tcp_listener */
typedef struct grpc_tcp_listener grpc_tcp_listener;
/* Called for newly connected TCP connections. */ /* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep); grpc_endpoint *ep);
@ -51,19 +54,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count, grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg); grpc_tcp_server_cb on_accept_cb, void *cb_arg);
/* Add a port to the server, returning port number on success, or negative /* Add a port to the server, returning the newly created listener on success,
on failure. or a null pointer on failure.
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually both IPv4 and IPv6 connections, but :: is the preferred style. This usually
creates one socket, but possibly two on systems which support IPv6, creates one socket, but possibly two on systems which support IPv6,
but not dualstack sockets. but not dualstack sockets. */
For raw access to the underlying sockets, see grpc_tcp_server_get_fd(). */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */ all of the multiple socket port matching logic in one place */
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
size_t addr_len); const void *addr,
size_t addr_len);
/* Returns the file descriptor of the Nth listening socket on this server, /* Returns the file descriptor of the Nth listening socket on this server,
or -1 if the index is out of bounds. or -1 if the index is out of bounds.
@ -75,4 +77,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_closure *closure); grpc_closure *closure);
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener);
void grpc_tcp_listener_ref(grpc_tcp_listener *listener);
void grpc_tcp_listener_unref(grpc_tcp_listener *listener);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */

@ -74,7 +74,7 @@ static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size; static int s_max_accept_queue_size;
/* one listening port */ /* one listening port */
typedef struct { typedef struct server_port {
int fd; int fd;
grpc_fd *emfd; grpc_fd *emfd;
grpc_tcp_server *server; grpc_tcp_server *server;
@ -84,8 +84,13 @@ typedef struct {
struct sockaddr_un un; struct sockaddr_un un;
} addr; } addr;
size_t addr_len; size_t addr_len;
int port;
grpc_closure read_closure; grpc_closure read_closure;
grpc_closure destroyed_closure; grpc_closure destroyed_closure;
gpr_refcount refs;
struct server_port *next;
struct server_port *dual_stack_second_port;
int is_dual_stack_second_port;
} server_port; } server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
@ -112,10 +117,9 @@ struct grpc_tcp_server {
/* is this server shutting down? (boolean) */ /* is this server shutting down? (boolean) */
int shutdown; int shutdown;
/* all listening ports */ /* linked list of server ports */
server_port *ports; server_port *head;
size_t nports; unsigned nports;
size_t port_capacity;
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
@ -134,18 +138,22 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->shutdown = 0; s->shutdown = 0;
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->head = NULL;
s->nports = 0; s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
return s; return s;
} }
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
server_port *sp;
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
gpr_mu_destroy(&s->mu); gpr_mu_destroy(&s->mu);
gpr_free(s->ports); for (sp = s->head; sp; sp = sp->next) {
grpc_tcp_listener_unref((grpc_tcp_listener *)sp);
}
gpr_free(s); gpr_free(s);
} }
@ -166,8 +174,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
events will be received on them - at this point it's safe to destroy events will be received on them - at this point it's safe to destroy
things */ things */
static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
/* delete ALL the things */ /* delete ALL the things */
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
@ -176,9 +182,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
return; return;
} }
if (s->nports) { if (s->head) {
for (i = 0; i < s->nports; i++) { server_port *sp;
server_port *sp = &s->ports[i]; for (sp = s->head; sp; sp = sp->next) {
if (sp->addr.sockaddr.sa_family == AF_UNIX) { if (sp->addr.sockaddr.sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(&sp->addr.un); unlink_if_unix_domain_socket(&sp->addr.un);
} }
@ -196,7 +202,6 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *closure) { grpc_closure *closure) {
size_t i;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown); GPR_ASSERT(!s->shutdown);
@ -206,8 +211,9 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
/* shutdown all fd's */ /* shutdown all fd's */
if (s->active_ports) { if (s->active_ports) {
for (i = 0; i < s->nports; i++) { server_port *sp;
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else { } else {
@ -364,9 +370,10 @@ error:
} }
} }
static int add_socket_to_server(grpc_tcp_server *s, int fd, static server_port *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len) { const struct sockaddr *addr,
server_port *sp; size_t addr_len) {
server_port *sp = NULL;
int port; int port;
char *addr_str; char *addr_str;
char *name; char *name;
@ -376,32 +383,35 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
/* append it to the list under a lock */ sp = gpr_malloc(sizeof(server_port));
if (s->nports == s->port_capacity) { sp->next = s->head;
s->port_capacity *= 2; s->head = sp;
s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
}
sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->fd = fd; sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name); sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len); memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len; sp->addr_len = addr_len;
sp->port = port;
sp->is_dual_stack_second_port = 0;
sp->dual_stack_second_port = NULL;
gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd); GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
gpr_free(addr_str); gpr_free(addr_str);
gpr_free(name); gpr_free(name);
} }
return port; return sp;
} }
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
size_t addr_len) { const void *addr,
int allocated_port1 = -1; size_t addr_len) {
int allocated_port2 = -1; int allocated_port = -1;
unsigned i; server_port *sp;
server_port *sp2 = NULL;
int fd; int fd;
grpc_dualstack_mode dsmode; grpc_dualstack_mode dsmode;
struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 addr6_v4mapped;
@ -420,9 +430,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Check if this is a wildcard port, and if so, try to keep the port the same /* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */ as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) { if (grpc_sockaddr_get_port(addr) == 0) {
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
sockname_len = sizeof(sockname_temp); sockname_len = sizeof(sockname_temp);
if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp,
&sockname_len)) { &sockname_len)) {
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
if (port > 0) { if (port > 0) {
@ -436,6 +446,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
} }
} }
sp = NULL;
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = (const struct sockaddr *)&addr6_v4mapped; addr = (const struct sockaddr *)&addr6_v4mapped;
addr_len = sizeof(addr6_v4mapped); addr_len = sizeof(addr6_v4mapped);
@ -449,14 +461,16 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6; addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6); addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
allocated_port1 = add_socket_to_server(s, fd, addr, addr_len); sp = add_socket_to_server(s, fd, addr, addr_len);
allocated_port = sp->port;
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done; goto done;
} }
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && allocated_port1 > 0) { if (port == 0 && allocated_port > 0) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port);
sp2 = sp;
} }
addr = (struct sockaddr *)&wild4; addr = (struct sockaddr *)&wild4;
addr_len = sizeof(wild4); addr_len = sizeof(wild4);
@ -471,22 +485,31 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy; addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy); addr_len = sizeof(addr4_copy);
} }
allocated_port2 = add_socket_to_server(s, fd, addr, addr_len); sp = add_socket_to_server(s, fd, addr, addr_len);
sp->dual_stack_second_port = sp2;
if (sp2) sp2->is_dual_stack_second_port = 1;
done: done:
gpr_free(allocated_addr); gpr_free(allocated_addr);
return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; return (grpc_tcp_listener *)sp;
} }
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
return (port_index < s->nports) ? s->ports[port_index].fd : -1; server_port *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--);
if (port_index == 0 && sp) {
return sp->fd;
} else {
return -1;
}
} }
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollsets, size_t pollset_count, grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) { void *on_accept_cb_arg) {
size_t i, j; size_t i;
server_port *sp;
GPR_ASSERT(on_accept_cb); GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(!s->on_accept_cb);
@ -495,17 +518,36 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb_arg = on_accept_cb_arg; s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets; s->pollsets = pollsets;
s->pollset_count = pollset_count; s->pollset_count = pollset_count;
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
for (j = 0; j < pollset_count; j++) { for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
} }
s->ports[i].read_closure.cb = on_read; sp->read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i]; sp->read_closure.cb_arg = sp;
grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, grpc_fd_notify_on_read(exec_ctx, sp->emfd,
&s->ports[i].read_closure); &sp->read_closure);
s->active_ports++; s->active_ports++;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
server_port *sp = (server_port *)listener;
return sp->port;
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
server_port *sp = (server_port *)listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
server_port *sp = (server_port *)listener;
if (sp->is_dual_stack_second_port) return;
if (gpr_unref(&sp->refs)) {
if (sp->dual_stack_second_port) gpr_free(sp->dual_stack_second_port);
gpr_free(listener);
}
}
#endif #endif

@ -35,7 +35,8 @@
#ifdef GPR_WINSOCK_SOCKET #ifdef GPR_WINSOCK_SOCKET
#define _GNU_SOURCE #include <io.h>
#include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/sockaddr_utils.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
@ -51,7 +52,6 @@
#include "src/core/iomgr/tcp_server.h" #include "src/core/iomgr/tcp_server.h"
#include "src/core/iomgr/tcp_windows.h" #include "src/core/iomgr/tcp_windows.h"
#define INIT_PORT_CAP 2
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
/* one listening port */ /* one listening port */
@ -61,14 +61,19 @@ typedef struct server_port {
gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2]; gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */ /* This will hold the socket for the next accept. */
SOCKET new_socket; SOCKET new_socket;
/* The listener winsocked. */ /* The listener winsocket. */
grpc_winsocket *socket; grpc_winsocket *socket;
/* The actual TCP port number. */
int port;
grpc_tcp_server *server; grpc_tcp_server *server;
/* The cached AcceptEx for that port. */ /* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down; int shutting_down;
/* closure for socket notification of accept being ready */ /* closure for socket notification of accept being ready */
grpc_closure on_accept; grpc_closure on_accept;
gpr_refcount refs;
/* linked list */
struct server_port *next;
} server_port; } server_port;
/* the overall server */ /* the overall server */
@ -82,10 +87,8 @@ struct grpc_tcp_server {
/* active port count: how many ports are actually still listening */ /* active port count: how many ports are actually still listening */
int active_ports; int active_ports;
/* all listening ports */ /* linked list of server ports */
server_port *ports; server_port *head;
size_t nports;
size_t port_capacity;
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
@ -99,9 +102,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->active_ports = 0; s->active_ports = 0;
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->head = NULL;
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
s->shutdown_complete = NULL; s->shutdown_complete = NULL;
return s; return s;
} }
@ -109,26 +110,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
static void dont_care_about_shutdown_completion(void *arg) {} static void dont_care_about_shutdown_completion(void *arg) {}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
/* Now that the accepts have been aborted, we can destroy the sockets. /* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already The IOCP won't get notified on these, so we can flag them as already
closed by the system. */ closed by the system. */
for (i = 0; i < s->nports; i++) { while (s->head) {
server_port *sp = &s->ports[i]; server_port *sp = s->head;
s->head = sp->next;
sp->next = NULL;
grpc_winsocket_destroy(sp->socket); grpc_winsocket_destroy(sp->socket);
grpc_tcp_listener_unref((grpc_tcp_listener *) sp);
} }
gpr_free(s->ports);
gpr_free(s); gpr_free(s);
} }
/* Public function. Stops and destroys a grpc_tcp_server. */ /* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *shutdown_complete) { grpc_closure *shutdown_complete) {
size_t i;
int immediately_done = 0; int immediately_done = 0;
server_port *sp;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->shutdown_complete = shutdown_complete; s->shutdown_complete = shutdown_complete;
@ -138,8 +139,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
if (s->active_ports == 0) { if (s->active_ports == 0) {
immediately_done = 1; immediately_done = 1;
} }
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
server_port *sp = &s->ports[i];
sp->shutting_down = 1; sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket); grpc_winsocket_shutdown(sp->socket);
} }
@ -351,16 +351,17 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
start_accept(exec_ctx, sp); start_accept(exec_ctx, sp);
} }
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, static server_port *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr, size_t addr_len) { const struct sockaddr *addr,
server_port *sp; size_t addr_len) {
server_port *sp = NULL;
int port; int port;
int status; int status;
GUID guid = WSAID_ACCEPTEX; GUID guid = WSAID_ACCEPTEX;
DWORD ioctl_num_bytes; DWORD ioctl_num_bytes;
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
if (sock == INVALID_SOCKET) return -1; if (sock == INVALID_SOCKET) return NULL;
/* We need to grab the AcceptEx pointer for that port, as it may be /* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */ interface-dependent. We'll cache it to avoid doing that again. */
@ -373,37 +374,35 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
closesocket(sock); closesocket(sock);
return -1; return NULL;
} }
port = prepare_socket(sock, addr, addr_len); port = prepare_socket(sock, addr, addr_len);
if (port >= 0) { if (port >= 0) {
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
/* append it to the list under a lock */ sp = gpr_malloc(sizeof(server_port));
if (s->nports == s->port_capacity) { sp->next = s->head;
/* too many ports, and we need to store their address in a closure */ s->head = sp;
/* TODO(ctiller): make server_port a linked list */
abort();
}
sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener"); sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0; sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET; sp->new_socket = INVALID_SOCKET;
sp->port = port;
gpr_ref_init(&sp->refs, 1);
grpc_closure_init(&sp->on_accept, on_accept, sp); grpc_closure_init(&sp->on_accept, on_accept, sp);
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
return port; return sp;
} }
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
size_t addr_len) { const void *addr,
int allocated_port = -1; size_t addr_len) {
unsigned i; server_port *sp;
SOCKET sock; SOCKET sock;
struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard; struct sockaddr_in6 wildcard;
@ -415,9 +414,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Check if this is a wildcard port, and if so, try to keep the port the same /* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */ as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) { if (grpc_sockaddr_get_port(addr) == 0) {
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
sockname_len = sizeof(sockname_temp); sockname_len = sizeof(sockname_temp);
if (0 == getsockname(s->ports[i].socket->socket, if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)&sockname_temp, &sockname_len)) { (struct sockaddr *)&sockname_temp, &sockname_len)) {
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
if (port > 0) { if (port > 0) {
@ -452,33 +451,55 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
gpr_free(utf8_message); gpr_free(utf8_message);
} }
allocated_port = add_socket_to_server(s, sock, addr, addr_len); sp = add_socket_to_server(s, sock, addr, addr_len);
gpr_free(allocated_addr); gpr_free(allocated_addr);
return allocated_port; return (grpc_tcp_listener *)sp;
} }
SOCKET int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { server_port *sp;
return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET; for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--);
if (port_index == 0 && sp) {
return _open_osfhandle(sp->socket->socket, 0);
} else {
return -1;
}
} }
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollset, size_t pollset_count, grpc_pollset **pollset, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) { void *on_accept_cb_arg) {
size_t i; server_port *sp;
GPR_ASSERT(on_accept_cb); GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(!s->on_accept_cb);
GPR_ASSERT(s->active_ports == 0); GPR_ASSERT(s->active_ports == 0);
s->on_accept_cb = on_accept_cb; s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg; s->on_accept_cb_arg = on_accept_cb_arg;
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
start_accept(exec_ctx, s->ports + i); start_accept(exec_ctx, sp);
s->active_ports++; s->active_ports++;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
server_port *sp = (server_port *)listener;
return sp->port;
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
server_port *sp = (server_port *)listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
server_port *sp = (server_port *)listener;
if (gpr_unref(&sp->refs)) {
gpr_free(listener);
}
}
#endif /* GPR_WINSOCK_SOCKET */ #endif /* GPR_WINSOCK_SOCKET */

@ -249,9 +249,11 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
} }
for (i = 0; i < resolved->naddrs; i++) { for (i = 0; i < resolved->naddrs; i++) {
port_temp = grpc_tcp_server_add_port( grpc_tcp_listener *listener;
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr, tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len); resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) { if (port_temp >= 0) {
if (port_num == -1) { if (port_num == -1) {
port_num = port_temp; port_num = port_temp;

@ -107,9 +107,11 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
} }
for (i = 0; i < resolved->naddrs; i++) { for (i = 0; i < resolved->naddrs; i++) {
port_temp = grpc_tcp_server_add_port( grpc_tcp_listener *listener;
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr, tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len); resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) { if (port_temp >= 0) {
if (port_num == -1) { if (port_num == -1) {
port_num = port_temp; port_num = port_temp;

@ -113,6 +113,7 @@ void reconnect_server_init(reconnect_server *server) {
void reconnect_server_start(reconnect_server *server, int port) { void reconnect_server_start(reconnect_server *server, int port) {
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_tcp_listener *listener;
int port_added; int port_added;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -121,8 +122,9 @@ void reconnect_server_start(reconnect_server *server, int port) {
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
server->tcp_server = grpc_tcp_server_create(); server->tcp_server = grpc_tcp_server_create();
port_added = listener =
grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
port_added = grpc_tcp_listener_get_port(listener);
GPR_ASSERT(port_added == port); GPR_ASSERT(port_added == port);
grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1,

Loading…
Cancel
Save