Make grpc_tcp_listener private.

reviewable/pr4680/r1
Dan Born 9 years ago
parent 7f34c783c4
commit fa6b606898
  1. 52
      src/core/iomgr/tcp_server.h
  2. 133
      src/core/iomgr/tcp_server_posix.c
  3. 120
      src/core/iomgr/tcp_server_windows.c
  4. 29
      src/core/security/server_secure_chttp2.c
  5. 15
      src/core/surface/server_chttp2.c
  6. 5
      test/core/client_config/set_initial_connect_string_test.c
  7. 54
      test/core/iomgr/tcp_server_posix_test.c
  8. 5
      test/core/util/reconnect_server.c
  9. 11
      test/core/util/test_tcp_server.c

@ -39,23 +39,25 @@
/* Forward decl of grpc_tcp_server */ /* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server; typedef struct grpc_tcp_server grpc_tcp_server;
/* Forward decl of grpc_tcp_listener */ /* Called for newly connected TCP connections. Callee owns a ref on
typedef struct grpc_tcp_listener grpc_tcp_listener; from_server. */
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep); grpc_endpoint *ep,
grpc_tcp_server *from_server,
unsigned port_index, unsigned fd_index);
/* Create a server, initially not bound to any ports */ /* Create a server, initially not bound to any ports. The caller owns one ref.
grpc_tcp_server *grpc_tcp_server_create(void); If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref(). */
grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete);
/* Start listening to bound ports */ /* Start listening to bound ports */
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count, grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg); grpc_tcp_server_cb on_accept_cb, void *cb_arg);
/* Add a port to the server, returning the newly created listener on success, /* Add a port to the server, returning the newly allocated port on success, or
or a null pointer on failure. -1 on failure.
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually both IPv4 and IPv6 connections, but :: is the preferred style. This usually
@ -63,21 +65,29 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
but not dualstack sockets. */ but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */ all of the multiple socket port matching logic in one place */
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
const void *addr, size_t addr_len); size_t addr_len);
/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index);
/* Returns the file descriptor of the Nth listening socket on this server, /* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth
or -1 if the index is out of bounds. (port_index) call to add_port() on this server, or -1 if the indices are out
of bounds. The file descriptor remains owned by the server, and will be
cleaned up when grpc_tcp_server_destroy is called. */
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index);
The file descriptor remains owned by the server, and will be cleaned /* Ref s and return s. */
up when grpc_tcp_server_destroy is called. */ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, /* Set or reset the shutdown_complete closure. shutdown_complete may be NULL. */
grpc_closure *closure); void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete);
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener); /* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
void grpc_tcp_listener_ref(grpc_tcp_listener *listener); a call (exec_ctx!=NULL) to shutdown_complete. */
void grpc_tcp_listener_unref(grpc_tcp_listener *listener); void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */

@ -73,6 +73,7 @@ static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size; static int s_max_accept_queue_size;
/* one listening port */ /* one listening port */
typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener { struct grpc_tcp_listener {
int fd; int fd;
grpc_fd *emfd; grpc_fd *emfd;
@ -84,9 +85,10 @@ struct grpc_tcp_listener {
} addr; } addr;
size_t addr_len; size_t addr_len;
int port; int port;
unsigned port_index;
unsigned fd_index;
grpc_closure read_closure; grpc_closure read_closure;
grpc_closure destroyed_closure; grpc_closure destroyed_closure;
gpr_refcount refs;
struct grpc_tcp_listener *next; struct grpc_tcp_listener *next;
/* When we add a listener, more than one can be created, mainly because of /* When we add a listener, more than one can be created, mainly because of
IPv6. A sibling will still be in the normal list, but will be flagged IPv6. A sibling will still be in the normal list, but will be flagged
@ -106,6 +108,7 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
/* the overall server */ /* the overall server */
struct grpc_tcp_server { struct grpc_tcp_server {
gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */ /* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb; grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg; void *on_accept_cb_arg;
@ -122,6 +125,7 @@ struct grpc_tcp_server {
/* linked list of server ports */ /* linked list of server ports */
grpc_tcp_listener *head; grpc_tcp_listener *head;
grpc_tcp_listener *tail;
unsigned nports; unsigned nports;
/* shutdown callback */ /* shutdown callback */
@ -133,28 +137,33 @@ struct grpc_tcp_server {
size_t pollset_count; size_t pollset_count;
}; };
grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
s->active_ports = 0; s->active_ports = 0;
s->destroyed_ports = 0; s->destroyed_ports = 0;
s->shutdown = 0; s->shutdown = 0;
s->shutdown_complete = shutdown_complete;
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
s->head = NULL; s->head = NULL;
s->tail = NULL;
s->nports = 0; s->nports = 0;
return s; return s;
} }
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); if (s->shutdown_complete != NULL) {
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
}
gpr_mu_destroy(&s->mu); gpr_mu_destroy(&s->mu);
while (s->head) { while (s->head) {
grpc_tcp_listener *sp = s->head; grpc_tcp_listener *sp = s->head;
s->head = sp->next; s->head = sp->next;
grpc_tcp_listener_unref(sp); gpr_free(sp);
} }
gpr_free(s); gpr_free(s);
@ -203,15 +212,13 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
} }
} }
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_closure *closure) { grpc_tcp_server *s) {
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown); GPR_ASSERT(!s->shutdown);
s->shutdown = 1; s->shutdown = 1;
s->shutdown_complete = closure;
/* shutdown all fd's */ /* shutdown all fd's */
if (s->active_ports) { if (s->active_ports) {
grpc_tcp_listener *sp; grpc_tcp_listener *sp;
@ -355,7 +362,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
} }
sp->server->on_accept_cb( sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg, exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
grpc_tcp_server_ref(sp->server), sp->port_index, sp->fd_index);
gpr_free(name); gpr_free(name);
gpr_free(addr_str); gpr_free(addr_str);
@ -375,7 +383,9 @@ error:
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, const struct sockaddr *addr,
size_t addr_len) { size_t addr_len,
unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp = NULL; grpc_tcp_listener *sp = NULL;
int port; int port;
char *addr_str; char *addr_str;
@ -389,17 +399,23 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
s->nports++; s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener)); sp = gpr_malloc(sizeof(grpc_tcp_listener));
sp->next = s->head; sp->next = NULL;
s->head = sp; if (s->head == NULL) {
s->head = sp;
} else {
s->tail->next = sp;
}
s->tail = sp;
sp->server = s; sp->server = s;
sp->fd = fd; sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name); sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len); memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len; sp->addr_len = addr_len;
sp->port = port; sp->port = port;
sp->port_index = port_index;
sp->fd_index = fd_index;
sp->is_sibling = 0; sp->is_sibling = 0;
sp->sibling = NULL; sp->sibling = NULL;
gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd); GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
gpr_free(addr_str); gpr_free(addr_str);
@ -409,8 +425,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
return sp; return sp;
} }
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
const void *addr, size_t addr_len) { size_t addr_len) {
grpc_tcp_listener *sp; grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL; grpc_tcp_listener *sp2 = NULL;
int fd; int fd;
@ -423,7 +439,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp; struct sockaddr_storage sockname_temp;
socklen_t sockname_len; socklen_t sockname_len;
int port; int port;
unsigned port_index = 0;
unsigned fd_index = 0;
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(addr); unlink_if_unix_domain_socket(addr);
} }
@ -462,11 +482,13 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&wild6; addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6); addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
sp = add_socket_to_server(s, fd, addr, addr_len); sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done; goto done;
} }
if (sp != NULL) {
++fd_index;
}
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && sp != NULL) { if (port == 0 && sp != NULL) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
@ -485,20 +507,47 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&addr4_copy; addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy); addr_len = sizeof(addr4_copy);
} }
sp = add_socket_to_server(s, fd, addr, addr_len); sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
if (sp != NULL) sp->sibling = sp2; if (sp2 != NULL) {
if (sp2 != NULL) sp2->is_sibling = 1; if (sp != NULL) {
sp->sibling = sp2;
}
sp2->is_sibling = 1;
}
done: done:
gpr_free(allocated_addr); gpr_free(allocated_addr);
return sp; if (sp != NULL) {
return sp->port;
} else {
return -1;
}
}
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) {
unsigned num_fds = 0;
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next) {
if (!sp->is_sibling) {
--port_index;
}
}
for (; sp; sp = sp->sibling, ++num_fds)
;
return num_fds;
} }
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp; grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--) for (sp = s->head; sp && port_index != 0; sp = sp->next) {
if (!sp->is_sibling) {
--port_index;
}
}
for (; sp && fd_index != 0; sp = sp->sibling, --fd_index)
; ;
if (port_index == 0 && sp) { if (sp) {
return sp->fd; return sp->fd;
} else { } else {
return -1; return -1;
@ -531,31 +580,25 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
if (listener != NULL) { gpr_ref(&s->refs);
grpc_tcp_listener *sp = listener; return s;
return sp->port;
} else {
return 0;
}
} }
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_tcp_listener *sp = listener; grpc_closure *shutdown_complete) {
gpr_ref(&sp->refs); s->shutdown_complete = shutdown_complete;
} }
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp = listener; if (gpr_unref(&s->refs)) {
if (sp->is_sibling) return; if (exec_ctx == NULL) {
if (gpr_unref(&sp->refs)) { grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_listener *sibling = sp->sibling; grpc_tcp_server_destroy(&local_exec_ctx, s);
while (sibling) { grpc_exec_ctx_finish(&local_exec_ctx);
sp = sibling; } else {
sibling = sp->sibling; grpc_tcp_server_destroy(exec_ctx, s);
gpr_free(sp);
} }
gpr_free(listener);
} }
} }

@ -65,19 +65,20 @@ struct grpc_tcp_listener {
grpc_winsocket *socket; grpc_winsocket *socket;
/* The actual TCP port number. */ /* The actual TCP port number. */
int port; int port;
unsigned port_index;
grpc_tcp_server *server; grpc_tcp_server *server;
/* The cached AcceptEx for that port. */ /* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down; int shutting_down;
/* closure for socket notification of accept being ready */ /* closure for socket notification of accept being ready */
grpc_closure on_accept; grpc_closure on_accept;
gpr_refcount refs;
/* linked list */ /* linked list */
struct grpc_tcp_listener *next; struct grpc_tcp_listener *next;
}; };
/* the overall server */ /* the overall server */
struct grpc_tcp_server { struct grpc_tcp_server {
gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */ /* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb; grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg; void *on_accept_cb_arg;
@ -89,6 +90,7 @@ struct grpc_tcp_server {
/* linked list of server ports */ /* linked list of server ports */
grpc_tcp_listener *head; grpc_tcp_listener *head;
grpc_tcp_listener *tail;
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
@ -96,21 +98,23 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a /* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */ grpc_tcp_server. */
grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
s->active_ports = 0; s->active_ports = 0;
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
s->head = NULL; s->head = NULL;
s->shutdown_complete = NULL; s->tail = NULL;
s->shutdown_complete = shutdown_complete;
return s; return s;
} }
static void dont_care_about_shutdown_completion(void *arg) {}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); if (s->shutdown_complete != NULL) {
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
}
/* Now that the accepts have been aborted, we can destroy the sockets. /* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already The IOCP won't get notified on these, so we can flag them as already
@ -125,15 +129,17 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s); gpr_free(s);
} }
/* Public function. Stops and destroys a grpc_tcp_server. */ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, gpr_ref(&s->refs);
grpc_closure *shutdown_complete) { return s;
}
static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
int immediately_done = 0; int immediately_done = 0;
grpc_tcp_listener *sp; grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->shutdown_complete = shutdown_complete;
/* First, shutdown all fd's. This will queue abortion calls for all /* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */ of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) { if (s->active_ports == 0) {
@ -150,6 +156,23 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
} }
} }
void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete) {
s->shutdown_complete = shutdown_complete;
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
if (exec_ctx == NULL) {
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server_destroy(&local_exec_ctx, s);
grpc_exec_ctx_finish(&local_exec_ctx);
} else {
grpc_tcp_server_destroy(exec_ctx, s);
}
}
}
/* Prepare (bind) a recently-created socket for listening. */ /* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr, static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
size_t addr_len) { size_t addr_len) {
@ -343,7 +366,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
/* The only time we should call our callback, is where we successfully /* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */ managed to accept a connection, and created an endpoint. */
if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep); if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep,
sp->server, sp->port_index, 0);
/* As we were notified from the IOCP of one and exactly one accept, /* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next to the new connection. We need to create a new one for the next
@ -353,7 +377,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr, const struct sockaddr *addr,
size_t addr_len) { size_t addr_len,
unsigned port_index) {
grpc_tcp_listener *sp = NULL; grpc_tcp_listener *sp = NULL;
int port; int port;
int status; int status;
@ -382,14 +407,20 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener)); sp = gpr_malloc(sizeof(grpc_tcp_listener));
sp->next = s->head; sp->next = NULL;
s->head = sp; if (s->head == NULL) {
s->head = sp;
} else {
s->tail->next = sp;
}
s->tail = sp;
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener"); sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0; sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET; sp->new_socket = INVALID_SOCKET;
sp->port = port; sp->port = port;
sp->port_index = port_index;
gpr_ref_init(&sp->refs, 1); gpr_ref_init(&sp->refs, 1);
grpc_closure_init(&sp->on_accept, on_accept, sp); grpc_closure_init(&sp->on_accept, on_accept, sp);
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
@ -399,8 +430,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return sp; return sp;
} }
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
const void *addr, size_t addr_len) { size_t addr_len) {
grpc_tcp_listener *sp; grpc_tcp_listener *sp;
SOCKET sock; SOCKET sock;
struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 addr6_v4mapped;
@ -409,6 +440,10 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp; struct sockaddr_storage sockname_temp;
socklen_t sockname_len; socklen_t sockname_len;
int port; int port;
unsigned port_index = 0;
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
/* Check if this is a wildcard port, and if so, try to keep the port the same /* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */ as some previously created listener. */
@ -450,17 +485,37 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
gpr_free(utf8_message); gpr_free(utf8_message);
} }
sp = add_socket_to_server(s, sock, addr, addr_len); sp = add_socket_to_server(s, sock, addr, addr_len, port_index);
gpr_free(allocated_addr); gpr_free(allocated_addr);
return sp; if (sp) {
return sp->port;
} else {
return -1;
}
}
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, int port_index) {
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
;
if (sp) {
return 1;
} else {
return 0;
}
} }
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp; grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--) if (fd_index != 0) {
/* Windows implementation has only one fd per port_index. */
return -1;
}
for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
; ;
if (port_index == 0 && sp) { if (sp) {
return _open_osfhandle(sp->socket->socket, 0); return _open_osfhandle(sp->socket->socket, 0);
} else { } else {
return -1; return -1;
@ -485,25 +540,4 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
if (listener != NULL) {
grpc_tcp_listener *sp = listener;
return sp->port;
} else {
return 0;
}
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
if (gpr_unref(&sp->refs)) {
gpr_free(listener);
}
}
#endif /* GPR_WINSOCK_SOCKET */ #endif /* GPR_WINSOCK_SOCKET */

@ -126,8 +126,10 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
state_unref(state); state_unref(state);
} }
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
grpc_endpoint *tcp) { grpc_tcp_server *from_server, unsigned port_index,
unsigned fd_index) {
grpc_tcp_server_unref(NULL, from_server);
grpc_server_secure_state *state = statep; grpc_server_secure_state *state = statep;
state_ref(state); state_ref(state);
grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp, grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp,
@ -144,8 +146,10 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) { static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
grpc_server_secure_state *state = statep; grpc_server_secure_state *state = statep;
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, if (state->destroy_callback != NULL) {
success); state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
success);
}
grpc_security_connector_shutdown(exec_ctx, state->sc); grpc_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state); state_unref(state);
} }
@ -161,8 +165,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
state->destroy_callback = callback; state->destroy_callback = callback;
tcp = state->tcp; tcp = state->tcp;
gpr_mu_unlock(&state->mu); gpr_mu_unlock(&state->mu);
grpc_closure_init(&state->destroy_closure, destroy_done, state); grpc_tcp_server_unref(exec_ctx, tcp);
grpc_tcp_server_destroy(exec_ctx, tcp, &state->destroy_closure);
} }
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
@ -199,18 +202,18 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
if (!resolved) { if (!resolved) {
goto error; goto error;
} }
state = gpr_malloc(sizeof(*state));
tcp = grpc_tcp_server_create(); memset(state, 0, sizeof(*state));
grpc_closure_init(&state->destroy_closure, destroy_done, state);
tcp = grpc_tcp_server_create(&state->destroy_closure);
if (!tcp) { if (!tcp) {
goto error; goto error;
} }
for (i = 0; i < resolved->naddrs; i++) { for (i = 0; i < resolved->naddrs; i++) {
grpc_tcp_listener *listener; port_temp = grpc_tcp_server_add_port(
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr, tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len); resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp > 0) { if (port_temp > 0) {
if (port_num == -1) { if (port_num == -1) {
port_num = port_temp; port_num = port_temp;
@ -232,8 +235,6 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
} }
grpc_resolved_addresses_destroy(resolved); grpc_resolved_addresses_destroy(resolved);
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
state->server = server; state->server = server;
state->tcp = tcp; state->tcp = tcp;
state->sc = sc; state->sc = sc;
@ -258,7 +259,7 @@ error:
grpc_resolved_addresses_destroy(resolved); grpc_resolved_addresses_destroy(resolved);
} }
if (tcp) { if (tcp) {
grpc_tcp_server_destroy(&exec_ctx, tcp, NULL); grpc_tcp_server_unref(&exec_ctx, tcp);
} }
if (state) { if (state) {
gpr_free(state); gpr_free(state);

@ -53,7 +53,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
} }
static void new_transport(grpc_exec_ctx *exec_ctx, void *server, static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
grpc_endpoint *tcp) { grpc_endpoint *tcp, grpc_tcp_server *tcp_server,
unsigned port_index, unsigned fd_index) {
/* /*
* Beware that the call to grpc_create_chttp2_transport() has to happen before * Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code * grpc_tcp_server_destroy(). This is fine here, but similar code
@ -65,6 +66,7 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
exec_ctx, grpc_server_get_channel_args(server), tcp, 0); exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
setup_transport(exec_ctx, server, transport); setup_transport(exec_ctx, server, transport);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
grpc_tcp_server_unref(exec_ctx, tcp_server);
} }
/* Server callback: start listening on our ports */ /* Server callback: start listening on our ports */
@ -80,7 +82,8 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) { grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp; grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_destroy(exec_ctx, tcp, destroy_done); grpc_tcp_server_set_shutdown_complete(tcp, destroy_done);
grpc_tcp_server_unref(exec_ctx, tcp);
} }
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
@ -100,15 +103,13 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error; goto error;
} }
tcp = grpc_tcp_server_create(); tcp = grpc_tcp_server_create(NULL);
GPR_ASSERT(tcp); GPR_ASSERT(tcp);
for (i = 0; i < resolved->naddrs; i++) { for (i = 0; i < resolved->naddrs; i++) {
grpc_tcp_listener *listener; port_temp = grpc_tcp_server_add_port(
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr, tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len); resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp > 0) { if (port_temp > 0) {
if (port_num == -1) { if (port_num == -1) {
port_num = port_temp; port_num = port_temp;
@ -139,7 +140,7 @@ error:
grpc_resolved_addresses_destroy(resolved); grpc_resolved_addresses_destroy(resolved);
} }
if (tcp) { if (tcp) {
grpc_tcp_server_destroy(&exec_ctx, tcp, NULL); grpc_tcp_server_unref(&exec_ctx, tcp);
} }
port_num = 0; port_num = 0;

@ -78,8 +78,11 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
} }
} }
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server *tcp_server, unsigned port_index,
unsigned fd_index) {
test_tcp_server *server = arg; test_tcp_server *server = arg;
grpc_tcp_server_unref(NULL, tcp_server);
grpc_closure_init(&on_read, handle_read, NULL); grpc_closure_init(&on_read, handle_read, NULL);
gpr_slice_buffer_init(&state.incoming_buffer); gpr_slice_buffer_init(&state.incoming_buffer);
gpr_slice_buffer_init(&state.temp_incoming_buffer); gpr_slice_buffer_init(&state.temp_incoming_buffer);

@ -48,61 +48,70 @@
static grpc_pollset g_pollset; static grpc_pollset g_pollset;
static int g_nconnects = 0; static int g_nconnects = 0;
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { struct on_connect_result {
int server_fd;
};
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server *tcp_server, unsigned port_index,
unsigned fd_index) {
struct on_connect_result *result = arg;
grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
result->server_fd = grpc_tcp_server_get_fd(tcp_server, port_index, fd_index);
g_nconnects++; g_nconnects++;
grpc_pollset_kick(&g_pollset, NULL); grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_tcp_server_unref(exec_ctx, tcp_server);
} }
static void test_no_op(void) { static void test_no_op(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server *s = grpc_tcp_server_create(); grpc_tcp_server *s = grpc_tcp_server_create(NULL);
grpc_tcp_server_destroy(&exec_ctx, s, NULL); grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void test_no_op_with_start(void) { static void test_no_op_with_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server *s = grpc_tcp_server_create(); grpc_tcp_server *s = grpc_tcp_server_create(NULL);
LOG_TEST("test_no_op_with_start"); LOG_TEST("test_no_op_with_start");
grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(&exec_ctx, s, NULL); grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void test_no_op_with_port(void) { static void test_no_op_with_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_tcp_server *s = grpc_tcp_server_create(); grpc_tcp_server *s = grpc_tcp_server_create(NULL);
LOG_TEST("test_no_op_with_port"); LOG_TEST("test_no_op_with_port");
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
GPR_ASSERT( GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) > 0);
grpc_tcp_server_destroy(&exec_ctx, s, NULL); grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void test_no_op_with_port_and_start(void) { static void test_no_op_with_port_and_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_tcp_server *s = grpc_tcp_server_create(); grpc_tcp_server *s = grpc_tcp_server_create(NULL);
LOG_TEST("test_no_op_with_port_and_start"); LOG_TEST("test_no_op_with_port_and_start");
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
GPR_ASSERT( GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) > 0);
grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(&exec_ctx, s, NULL); grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
@ -111,32 +120,40 @@ static void test_connect(int n) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr); socklen_t addr_len = sizeof(addr);
int svrfd, clifd; int svrfd, clifd;
grpc_tcp_server *s = grpc_tcp_server_create(); grpc_tcp_server *s = grpc_tcp_server_create(NULL);
int nconnects_before; int nconnects_before;
gpr_timespec deadline; gpr_timespec deadline;
grpc_pollset *pollsets[1]; grpc_pollset *pollsets[1];
int i; int i;
struct on_connect_result result;
LOG_TEST("test_connect"); LOG_TEST("test_connect");
gpr_log(GPR_INFO, "clients=%d", n); gpr_log(GPR_INFO, "clients=%d", n);
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET; addr.ss_family = AF_INET;
GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len)); GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len) >
0);
svrfd = grpc_tcp_server_get_fd(s, 0); GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 2) == 0);
GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 1) == 0);
GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 0) == 1);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 1) < 0);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 2) < 0);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 2, 0) < 0);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 1, 0) < 0);
svrfd = grpc_tcp_server_get_fd(s, 0, 0);
GPR_ASSERT(svrfd >= 0); GPR_ASSERT(svrfd >= 0);
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr)); GPR_ASSERT(addr_len <= sizeof(addr));
pollsets[0] = &g_pollset; pollsets[0] = &g_pollset;
grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, &result);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
nconnects_before = g_nconnects; nconnects_before = g_nconnects;
result.server_fd = -1;
clifd = socket(addr.ss_family, SOCK_STREAM, 0); clifd = socket(addr.ss_family, SOCK_STREAM, 0);
GPR_ASSERT(clifd >= 0); GPR_ASSERT(clifd >= 0);
gpr_log(GPR_DEBUG, "start connect"); gpr_log(GPR_DEBUG, "start connect");
@ -156,11 +173,12 @@ static void test_connect(int n) {
GPR_ASSERT(g_nconnects == nconnects_before + 1); GPR_ASSERT(g_nconnects == nconnects_before + 1);
close(clifd); close(clifd);
GPR_ASSERT(svrfd == result.server_fd);
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_tcp_server_destroy(&exec_ctx, s, NULL); grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }

@ -66,12 +66,15 @@ static void pretty_print_backoffs(reconnect_server *server) {
} }
} }
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server *tcp_server, unsigned port_index,
unsigned fd_index) {
char *peer; char *peer;
char *last_colon; char *last_colon;
reconnect_server *server = (reconnect_server *)arg; reconnect_server *server = (reconnect_server *)arg;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list *new_tail; timestamp_list *new_tail;
grpc_tcp_server_unref(NULL, tcp_server);
peer = grpc_endpoint_get_peer(tcp); peer = grpc_endpoint_get_peer(tcp);
grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp);

@ -58,7 +58,6 @@ void test_tcp_server_init(test_tcp_server *server,
void test_tcp_server_start(test_tcp_server *server, int port) { void test_tcp_server_start(test_tcp_server *server, int port) {
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_tcp_listener *listener;
int port_added; int port_added;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -66,9 +65,9 @@ void test_tcp_server_start(test_tcp_server *server, int port) {
addr.sin_port = htons((uint16_t)port); addr.sin_port = htons((uint16_t)port);
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
server->tcp_server = grpc_tcp_server_create(); server->tcp_server = grpc_tcp_server_create(NULL);
listener = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); port_added =
port_added = grpc_tcp_listener_get_port(listener); grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
GPR_ASSERT(port_added == port); GPR_ASSERT(port_added == port);
grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1,
@ -106,7 +105,9 @@ void test_tcp_server_destroy(test_tcp_server *server) {
grpc_closure do_nothing_cb; grpc_closure do_nothing_cb;
grpc_closure_init(&server_shutdown_cb, on_server_destroyed, server); grpc_closure_init(&server_shutdown_cb, on_server_destroyed, server);
grpc_closure_init(&do_nothing_cb, do_nothing, NULL); grpc_closure_init(&do_nothing_cb, do_nothing, NULL);
grpc_tcp_server_destroy(&exec_ctx, server->tcp_server, &server_shutdown_cb); grpc_tcp_server_set_shutdown_complete(server->tcp_server,
&server_shutdown_cb);
grpc_tcp_server_unref(&exec_ctx, server->tcp_server);
shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(5, GPR_TIMESPAN)); gpr_time_from_seconds(5, GPR_TIMESPAN));
while (!server->shutdown && while (!server->shutdown &&

Loading…
Cancel
Save