From fa6b606898b18c22cdcbaa006338fe1d57d8f93f Mon Sep 17 00:00:00 2001 From: Dan Born Date: Fri, 8 Jan 2016 21:01:59 -0800 Subject: [PATCH 01/17] Make grpc_tcp_listener private. --- src/core/iomgr/tcp_server.h | 52 ++++--- src/core/iomgr/tcp_server_posix.c | 133 ++++++++++++------ src/core/iomgr/tcp_server_windows.c | 120 ++++++++++------ src/core/security/server_secure_chttp2.c | 29 ++-- src/core/surface/server_chttp2.c | 15 +- .../set_initial_connect_string_test.c | 5 +- test/core/iomgr/tcp_server_posix_test.c | 54 ++++--- test/core/util/reconnect_server.c | 5 +- test/core/util/test_tcp_server.c | 11 +- 9 files changed, 269 insertions(+), 155 deletions(-) diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 3294e137974..92e504d20be 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -39,23 +39,25 @@ /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; -/* Forward decl of grpc_tcp_listener */ -typedef struct grpc_tcp_listener grpc_tcp_listener; - -/* Called for newly connected TCP connections. */ +/* Called for newly connected TCP connections. Callee owns a ref on + from_server. */ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, - grpc_endpoint *ep); + grpc_endpoint *ep, + grpc_tcp_server *from_server, + unsigned port_index, unsigned fd_index); -/* Create a server, initially not bound to any ports */ -grpc_tcp_server *grpc_tcp_server_create(void); +/* Create a server, initially not bound to any ports. The caller owns one ref. + If shutdown_complete is not NULL, it will be used by + grpc_tcp_server_unref(). */ +grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *cb_arg); -/* Add a port to the server, returning the newly created listener on success, - or a null pointer on failure. +/* Add a port to the server, returning the newly allocated port on success, or + -1 on failure. The :: and 0.0.0.0 wildcard addresses are treated identically, accepting both IPv4 and IPv6 connections, but :: is the preferred style. This usually @@ -63,21 +65,29 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, but not dualstack sockets. */ /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, - const void *addr, size_t addr_len); +int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len); + +/* Number of fds at the given port_index, or 0 if port_index is out of + bounds. */ +unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index); -/* Returns the file descriptor of the Nth listening socket on this server, - or -1 if the index is out of bounds. +/* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth + (port_index) call to add_port() on this server, or -1 if the indices are out + of bounds. The file descriptor remains owned by the server, and will be + cleaned up when grpc_tcp_server_destroy is called. */ +int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index, + unsigned fd_index); - The file descriptor remains owned by the server, and will be cleaned - up when grpc_tcp_server_destroy is called. */ -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index); +/* Ref s and return s. */ +grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s); -void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, - grpc_closure *closure); +/* Set or reset the shutdown_complete closure. shutdown_complete may be NULL. */ +void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s, + grpc_closure *shutdown_complete); -int grpc_tcp_listener_get_port(grpc_tcp_listener *listener); -void grpc_tcp_listener_ref(grpc_tcp_listener *listener); -void grpc_tcp_listener_unref(grpc_tcp_listener *listener); +/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue + a call (exec_ctx!=NULL) to shutdown_complete. */ +void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 49e83cf6aeb..f1119f689ad 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -73,6 +73,7 @@ static gpr_once s_init_max_accept_queue_size; static int s_max_accept_queue_size; /* one listening port */ +typedef struct grpc_tcp_listener grpc_tcp_listener; struct grpc_tcp_listener { int fd; grpc_fd *emfd; @@ -84,9 +85,10 @@ struct grpc_tcp_listener { } addr; size_t addr_len; int port; + unsigned port_index; + unsigned fd_index; grpc_closure read_closure; grpc_closure destroyed_closure; - gpr_refcount refs; struct grpc_tcp_listener *next; /* When we add a listener, more than one can be created, mainly because of IPv6. A sibling will still be in the normal list, but will be flagged @@ -106,6 +108,7 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_tcp_server { + gpr_refcount refs; /* Called whenever accept() succeeds on a server port. */ grpc_tcp_server_cb on_accept_cb; void *on_accept_cb_arg; @@ -122,6 +125,7 @@ struct grpc_tcp_server { /* linked list of server ports */ grpc_tcp_listener *head; + grpc_tcp_listener *tail; unsigned nports; /* shutdown callback */ @@ -133,28 +137,33 @@ struct grpc_tcp_server { size_t pollset_count; }; -grpc_tcp_server *grpc_tcp_server_create(void) { +grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; + s->shutdown_complete = shutdown_complete; s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; s->head = NULL; + s->tail = NULL; s->nports = 0; return s; } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + } gpr_mu_destroy(&s->mu); while (s->head) { grpc_tcp_listener *sp = s->head; s->head = sp->next; - grpc_tcp_listener_unref(sp); + gpr_free(sp); } gpr_free(s); @@ -203,15 +212,13 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, - grpc_closure *closure) { +static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); s->shutdown = 1; - s->shutdown_complete = closure; - /* shutdown all fd's */ if (s->active_ports) { grpc_tcp_listener *sp; @@ -355,7 +362,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { } sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + grpc_tcp_server_ref(sp->server), sp->port_index, sp->fd_index); gpr_free(name); gpr_free(addr_str); @@ -375,7 +383,9 @@ error: static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, const struct sockaddr *addr, - size_t addr_len) { + size_t addr_len, + unsigned port_index, + unsigned fd_index) { grpc_tcp_listener *sp = NULL; int port; char *addr_str; @@ -389,17 +399,23 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, s->nports++; GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); sp = gpr_malloc(sizeof(grpc_tcp_listener)); - sp->next = s->head; - s->head = sp; + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; + } + s->tail = sp; sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->port = port; + sp->port_index = port_index; + sp->fd_index = fd_index; sp->is_sibling = 0; sp->sibling = NULL; - gpr_ref_init(&sp->refs, 1); GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(addr_str); @@ -409,8 +425,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, return sp; } -grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, - const void *addr, size_t addr_len) { +int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len) { grpc_tcp_listener *sp; grpc_tcp_listener *sp2 = NULL; int fd; @@ -423,7 +439,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, struct sockaddr_storage sockname_temp; socklen_t sockname_len; int port; - + unsigned port_index = 0; + unsigned fd_index = 0; + if (s->tail != NULL) { + port_index = s->tail->port_index + 1; + } if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { unlink_if_unix_domain_socket(addr); } @@ -462,11 +482,13 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - sp = add_socket_to_server(s, fd, addr, addr_len); + sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } - + if (sp != NULL) { + ++fd_index; + } /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ if (port == 0 && sp != NULL) { grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); @@ -485,20 +507,47 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } - sp = add_socket_to_server(s, fd, addr, addr_len); - if (sp != NULL) sp->sibling = sp2; - if (sp2 != NULL) sp2->is_sibling = 1; + sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); + if (sp2 != NULL) { + if (sp != NULL) { + sp->sibling = sp2; + } + sp2->is_sibling = 1; + } done: gpr_free(allocated_addr); - return sp; + if (sp != NULL) { + return sp->port; + } else { + return -1; + } +} + +unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) { + unsigned num_fds = 0; + grpc_tcp_listener *sp; + for (sp = s->head; sp && port_index != 0; sp = sp->next) { + if (!sp->is_sibling) { + --port_index; + } + } + for (; sp; sp = sp->sibling, ++num_fds) + ; + return num_fds; } -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { +int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index, + unsigned fd_index) { grpc_tcp_listener *sp; - for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--) + for (sp = s->head; sp && port_index != 0; sp = sp->next) { + if (!sp->is_sibling) { + --port_index; + } + } + for (; sp && fd_index != 0; sp = sp->sibling, --fd_index) ; - if (port_index == 0 && sp) { + if (sp) { return sp->fd; } else { return -1; @@ -531,31 +580,25 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, gpr_mu_unlock(&s->mu); } -int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { - if (listener != NULL) { - grpc_tcp_listener *sp = listener; - return sp->port; - } else { - return 0; - } +grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + gpr_ref(&s->refs); + return s; } -void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { - grpc_tcp_listener *sp = listener; - gpr_ref(&sp->refs); +void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s, + grpc_closure *shutdown_complete) { + s->shutdown_complete = shutdown_complete; } -void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { - grpc_tcp_listener *sp = listener; - if (sp->is_sibling) return; - if (gpr_unref(&sp->refs)) { - grpc_tcp_listener *sibling = sp->sibling; - while (sibling) { - sp = sibling; - sibling = sp->sibling; - gpr_free(sp); +void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + if (gpr_unref(&s->refs)) { + if (exec_ctx == NULL) { + grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server_destroy(&local_exec_ctx, s); + grpc_exec_ctx_finish(&local_exec_ctx); + } else { + grpc_tcp_server_destroy(exec_ctx, s); } - gpr_free(listener); } } diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index d38fd8860a1..0435f5005bb 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -65,19 +65,20 @@ struct grpc_tcp_listener { grpc_winsocket *socket; /* The actual TCP port number. */ int port; + unsigned port_index; grpc_tcp_server *server; /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; int shutting_down; /* closure for socket notification of accept being ready */ grpc_closure on_accept; - gpr_refcount refs; /* linked list */ struct grpc_tcp_listener *next; }; /* the overall server */ struct grpc_tcp_server { + gpr_refcount refs; /* Called whenever accept() succeeds on a server port. */ grpc_tcp_server_cb on_accept_cb; void *on_accept_cb_arg; @@ -89,6 +90,7 @@ struct grpc_tcp_server { /* linked list of server ports */ grpc_tcp_listener *head; + grpc_tcp_listener *tail; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -96,21 +98,23 @@ struct grpc_tcp_server { /* Public function. Allocates the proper data structures to hold a grpc_tcp_server. */ -grpc_tcp_server *grpc_tcp_server_create(void) { +grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; s->head = NULL; - s->shutdown_complete = NULL; + s->tail = NULL; + s->shutdown_complete = shutdown_complete; return s; } -static void dont_care_about_shutdown_completion(void *arg) {} - static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); + } /* Now that the accepts have been aborted, we can destroy the sockets. The IOCP won't get notified on these, so we can flag them as already @@ -125,15 +129,17 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(s); } -/* Public function. Stops and destroys a grpc_tcp_server. */ -void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, - grpc_closure *shutdown_complete) { +grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + gpr_ref(&s->refs); + return s; +} + +static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) { int immediately_done = 0; grpc_tcp_listener *sp; gpr_mu_lock(&s->mu); - s->shutdown_complete = shutdown_complete; - /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts due to the normal operation mechanism. */ if (s->active_ports == 0) { @@ -150,6 +156,23 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, } } +void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s, + grpc_closure *shutdown_complete) { + s->shutdown_complete = shutdown_complete; +} + +void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + if (gpr_unref(&s->refs)) { + if (exec_ctx == NULL) { + grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server_destroy(&local_exec_ctx, s); + grpc_exec_ctx_finish(&local_exec_ctx); + } else { + grpc_tcp_server_destroy(exec_ctx, s); + } + } +} + /* Prepare (bind) a recently-created socket for listening. */ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, size_t addr_len) { @@ -343,7 +366,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ - if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep); + if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, + sp->server, sp->port_index, 0); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next @@ -353,7 +377,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, const struct sockaddr *addr, - size_t addr_len) { + size_t addr_len, + unsigned port_index) { grpc_tcp_listener *sp = NULL; int port; int status; @@ -382,14 +407,20 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); sp = gpr_malloc(sizeof(grpc_tcp_listener)); - sp->next = s->head; - s->head = sp; + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; + } + s->tail = sp; sp->server = s; sp->socket = grpc_winsocket_create(sock, "listener"); sp->shutting_down = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; sp->port = port; + sp->port_index = port_index; gpr_ref_init(&sp->refs, 1); grpc_closure_init(&sp->on_accept, on_accept, sp); GPR_ASSERT(sp->socket); @@ -399,8 +430,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, return sp; } -grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, - const void *addr, size_t addr_len) { +int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len) { grpc_tcp_listener *sp; SOCKET sock; struct sockaddr_in6 addr6_v4mapped; @@ -409,6 +440,10 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, struct sockaddr_storage sockname_temp; socklen_t sockname_len; int port; + unsigned port_index = 0; + if (s->tail != NULL) { + port_index = s->tail->port_index + 1; + } /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ @@ -450,17 +485,37 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, gpr_free(utf8_message); } - sp = add_socket_to_server(s, sock, addr, addr_len); + sp = add_socket_to_server(s, sock, addr, addr_len, port_index); gpr_free(allocated_addr); - return sp; + if (sp) { + return sp->port; + } else { + return -1; + } +} + +unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, int port_index) { + grpc_tcp_listener *sp; + for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index) + ; + if (sp) { + return 1; + } else { + return 0; + } } -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { +int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index, + unsigned fd_index) { grpc_tcp_listener *sp; - for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--) + if (fd_index != 0) { + /* Windows implementation has only one fd per port_index. */ + return -1; + } + for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index) ; - if (port_index == 0 && sp) { + if (sp) { return _open_osfhandle(sp->socket->socket, 0); } else { return -1; @@ -485,25 +540,4 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, gpr_mu_unlock(&s->mu); } -int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { - if (listener != NULL) { - grpc_tcp_listener *sp = listener; - return sp->port; - } else { - return 0; - } -} - -void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { - grpc_tcp_listener *sp = listener; - gpr_ref(&sp->refs); -} - -void grpc_tcp_listener_unref(grpc_tcp_listener *listener) { - grpc_tcp_listener *sp = listener; - if (gpr_unref(&sp->refs)) { - gpr_free(listener); - } -} - #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index d7fad338549..f4f3ff49218 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -126,8 +126,10 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, state_unref(state); } -static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, - grpc_endpoint *tcp) { +static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, + grpc_tcp_server *from_server, unsigned port_index, + unsigned fd_index) { + grpc_tcp_server_unref(NULL, from_server); grpc_server_secure_state *state = statep; state_ref(state); grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp, @@ -144,8 +146,10 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) { grpc_server_secure_state *state = statep; - state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, - success); + if (state->destroy_callback != NULL) { + state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, + success); + } grpc_security_connector_shutdown(exec_ctx, state->sc); state_unref(state); } @@ -161,8 +165,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, state->destroy_callback = callback; tcp = state->tcp; gpr_mu_unlock(&state->mu); - grpc_closure_init(&state->destroy_closure, destroy_done, state); - grpc_tcp_server_destroy(exec_ctx, tcp, &state->destroy_closure); + grpc_tcp_server_unref(exec_ctx, tcp); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, @@ -199,18 +202,18 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, if (!resolved) { goto error; } - - tcp = grpc_tcp_server_create(); + state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->destroy_closure, destroy_done, state); + tcp = grpc_tcp_server_create(&state->destroy_closure); if (!tcp) { goto error; } for (i = 0; i < resolved->naddrs; i++) { - grpc_tcp_listener *listener; - listener = grpc_tcp_server_add_port( + port_temp = grpc_tcp_server_add_port( tcp, (struct sockaddr *)&resolved->addrs[i].addr, resolved->addrs[i].len); - port_temp = grpc_tcp_listener_get_port(listener); if (port_temp > 0) { if (port_num == -1) { port_num = port_temp; @@ -232,8 +235,6 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, } grpc_resolved_addresses_destroy(resolved); - state = gpr_malloc(sizeof(*state)); - memset(state, 0, sizeof(*state)); state->server = server; state->tcp = tcp; state->sc = sc; @@ -258,7 +259,7 @@ error: grpc_resolved_addresses_destroy(resolved); } if (tcp) { - grpc_tcp_server_destroy(&exec_ctx, tcp, NULL); + grpc_tcp_server_unref(&exec_ctx, tcp); } if (state) { gpr_free(state); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 5ce7c1955b8..f0b9f211b3d 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -53,7 +53,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *server, } static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp) { + grpc_endpoint *tcp, grpc_tcp_server *tcp_server, + unsigned port_index, unsigned fd_index) { /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -65,6 +66,7 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, exec_ctx, grpc_server_get_channel_args(server), tcp, 0); setup_transport(exec_ctx, server, transport); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + grpc_tcp_server_unref(exec_ctx, tcp_server); } /* Server callback: start listening on our ports */ @@ -80,7 +82,8 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_destroy(exec_ctx, tcp, destroy_done); + grpc_tcp_server_set_shutdown_complete(tcp, destroy_done); + grpc_tcp_server_unref(exec_ctx, tcp); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { @@ -100,15 +103,13 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { goto error; } - tcp = grpc_tcp_server_create(); + tcp = grpc_tcp_server_create(NULL); GPR_ASSERT(tcp); for (i = 0; i < resolved->naddrs; i++) { - grpc_tcp_listener *listener; - listener = grpc_tcp_server_add_port( + port_temp = grpc_tcp_server_add_port( tcp, (struct sockaddr *)&resolved->addrs[i].addr, resolved->addrs[i].len); - port_temp = grpc_tcp_listener_get_port(listener); if (port_temp > 0) { if (port_num == -1) { port_num = port_temp; @@ -139,7 +140,7 @@ error: grpc_resolved_addresses_destroy(resolved); } if (tcp) { - grpc_tcp_server_destroy(&exec_ctx, tcp, NULL); + grpc_tcp_server_unref(&exec_ctx, tcp); } port_num = 0; diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index ceca56c833b..68832a4b4e5 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -78,8 +78,11 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { } } -static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { +static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_tcp_server *tcp_server, unsigned port_index, + unsigned fd_index) { test_tcp_server *server = arg; + grpc_tcp_server_unref(NULL, tcp_server); grpc_closure_init(&on_read, handle_read, NULL); gpr_slice_buffer_init(&state.incoming_buffer); gpr_slice_buffer_init(&state.temp_incoming_buffer); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 530381e37fe..2791087b8a0 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -48,61 +48,70 @@ static grpc_pollset g_pollset; static int g_nconnects = 0; -static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { +struct on_connect_result { + int server_fd; +}; + +static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_tcp_server *tcp_server, unsigned port_index, + unsigned fd_index) { + struct on_connect_result *result = arg; grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + result->server_fd = grpc_tcp_server_get_fd(tcp_server, port_index, fd_index); g_nconnects++; grpc_pollset_kick(&g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_tcp_server_unref(exec_ctx, tcp_server); } static void test_no_op(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp_server *s = grpc_tcp_server_create(); - grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_tcp_server *s = grpc_tcp_server_create(NULL); + grpc_tcp_server_unref(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp_server *s = grpc_tcp_server_create(); + grpc_tcp_server *s = grpc_tcp_server_create(NULL); LOG_TEST("test_no_op_with_start"); grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); - grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_tcp_server_unref(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_in addr; - grpc_tcp_server *s = grpc_tcp_server_create(); + grpc_tcp_server *s = grpc_tcp_server_create(NULL); LOG_TEST("test_no_op_with_port"); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; GPR_ASSERT( - grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); + grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) > 0); - grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_tcp_server_unref(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port_and_start(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_in addr; - grpc_tcp_server *s = grpc_tcp_server_create(); + grpc_tcp_server *s = grpc_tcp_server_create(NULL); LOG_TEST("test_no_op_with_port_and_start"); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; GPR_ASSERT( - grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); + grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) > 0); grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); - grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_tcp_server_unref(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); } @@ -111,32 +120,40 @@ static void test_connect(int n) { struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); int svrfd, clifd; - grpc_tcp_server *s = grpc_tcp_server_create(); + grpc_tcp_server *s = grpc_tcp_server_create(NULL); int nconnects_before; gpr_timespec deadline; grpc_pollset *pollsets[1]; int i; + struct on_connect_result result; LOG_TEST("test_connect"); gpr_log(GPR_INFO, "clients=%d", n); memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; - GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len)); - - svrfd = grpc_tcp_server_get_fd(s, 0); + GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len) > + 0); + GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 2) == 0); + GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 1) == 0); + GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 0) == 1); + GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 1) < 0); + GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 2) < 0); + GPR_ASSERT(grpc_tcp_server_get_fd(s, 2, 0) < 0); + GPR_ASSERT(grpc_tcp_server_get_fd(s, 1, 0) < 0); + svrfd = grpc_tcp_server_get_fd(s, 0, 0); GPR_ASSERT(svrfd >= 0); GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); pollsets[0] = &g_pollset; - grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); + grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, &result); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (i = 0; i < n; i++) { deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); - nconnects_before = g_nconnects; + result.server_fd = -1; clifd = socket(addr.ss_family, SOCK_STREAM, 0); GPR_ASSERT(clifd >= 0); gpr_log(GPR_DEBUG, "start connect"); @@ -156,11 +173,12 @@ static void test_connect(int n) { GPR_ASSERT(g_nconnects == nconnects_before + 1); close(clifd); + GPR_ASSERT(svrfd == result.server_fd); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_tcp_server_unref(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index 28e521221b9..0db25703c4a 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -66,12 +66,15 @@ static void pretty_print_backoffs(reconnect_server *server) { } } -static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { +static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_tcp_server *tcp_server, unsigned port_index, + unsigned fd_index) { char *peer; char *last_colon; reconnect_server *server = (reconnect_server *)arg; gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); timestamp_list *new_tail; + grpc_tcp_server_unref(NULL, tcp_server); peer = grpc_endpoint_get_peer(tcp); grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index 53b574d2856..cd7cd8db6af 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -58,7 +58,6 @@ void test_tcp_server_init(test_tcp_server *server, void test_tcp_server_start(test_tcp_server *server, int port) { struct sockaddr_in addr; - grpc_tcp_listener *listener; int port_added; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -66,9 +65,9 @@ void test_tcp_server_start(test_tcp_server *server, int port) { addr.sin_port = htons((uint16_t)port); memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); - server->tcp_server = grpc_tcp_server_create(); - listener = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); - port_added = grpc_tcp_listener_get_port(listener); + server->tcp_server = grpc_tcp_server_create(NULL); + port_added = + grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); GPR_ASSERT(port_added == port); grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, @@ -106,7 +105,9 @@ void test_tcp_server_destroy(test_tcp_server *server) { grpc_closure do_nothing_cb; grpc_closure_init(&server_shutdown_cb, on_server_destroyed, server); grpc_closure_init(&do_nothing_cb, do_nothing, NULL); - grpc_tcp_server_destroy(&exec_ctx, server->tcp_server, &server_shutdown_cb); + grpc_tcp_server_set_shutdown_complete(server->tcp_server, + &server_shutdown_cb); + grpc_tcp_server_unref(&exec_ctx, server->tcp_server); shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(5, GPR_TIMESPAN)); while (!server->shutdown && From 5d81d15260a2f355580bc1ede89c69b456c71f9f Mon Sep 17 00:00:00 2001 From: Dan Born Date: Tue, 12 Jan 2016 20:29:29 -0800 Subject: [PATCH 02/17] Code review follow-up. --- src/core/iomgr/tcp_server.h | 26 +++++++++++-------- src/core/iomgr/tcp_server_posix.c | 23 +++++++--------- src/core/iomgr/tcp_server_windows.c | 22 +++++++--------- src/core/security/server_secure_chttp2.c | 4 +-- src/core/surface/server_chttp2.c | 7 +++-- .../set_initial_connect_string_test.c | 4 +-- test/core/iomgr/tcp_server_posix_test.c | 23 ++++++++-------- test/core/util/reconnect_server.c | 4 +-- test/core/util/test_tcp_server.c | 21 +++++++-------- test/core/util/test_tcp_server.h | 1 + tools/run_tests/run_tests.py | 8 +++--- 11 files changed, 66 insertions(+), 77 deletions(-) diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 92e504d20be..e4fd0999a9f 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -39,12 +39,20 @@ /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; -/* Called for newly connected TCP connections. Callee owns a ref on - from_server. */ +typedef struct grpc_tcp_server_acceptor grpc_tcp_server_acceptor; +struct grpc_tcp_server_acceptor { + /* grpc_tcp_server_cb functions share a ref on from_server that is valid + until the function returns. */ + grpc_tcp_server *from_server; + /* Indices that may be passed to grpc_tcp_server_port_fd(). */ + unsigned port_index; + unsigned fd_index; +}; + +/* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *ep, - grpc_tcp_server *from_server, - unsigned port_index, unsigned fd_index); + grpc_tcp_server_acceptor *acceptor); /* Create a server, initially not bound to any ports. The caller owns one ref. If shutdown_complete is not NULL, it will be used by @@ -70,22 +78,18 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, /* Number of fds at the given port_index, or 0 if port_index is out of bounds. */ -unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index); +unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, unsigned port_index); /* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth (port_index) call to add_port() on this server, or -1 if the indices are out of bounds. The file descriptor remains owned by the server, and will be cleaned up when grpc_tcp_server_destroy is called. */ -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index, - unsigned fd_index); +int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, + unsigned fd_index); /* Ref s and return s. */ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s); -/* Set or reset the shutdown_complete closure. shutdown_complete may be NULL. */ -void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s, - grpc_closure *shutdown_complete); - /* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue a call (exec_ctx!=NULL) to shutdown_complete. */ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index f1119f689ad..3e0c5be3483 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -212,8 +212,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, - grpc_tcp_server *s) { +static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -315,6 +314,8 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { grpc_tcp_listener *sp = arg; + grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, + sp->fd_index}; grpc_fd *fdobj; size_t i; @@ -363,7 +364,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), - grpc_tcp_server_ref(sp->server), sp->port_index, sp->fd_index); + &acceptor); gpr_free(name); gpr_free(addr_str); @@ -524,7 +525,8 @@ done: } } -unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) { +unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, + unsigned port_index) { unsigned num_fds = 0; grpc_tcp_listener *sp; for (sp = s->head; sp && port_index != 0; sp = sp->next) { @@ -537,8 +539,8 @@ unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) { return num_fds; } -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index, - unsigned fd_index) { +int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, + unsigned fd_index) { grpc_tcp_listener *sp; for (sp = s->head; sp && port_index != 0; sp = sp->next) { if (!sp->is_sibling) { @@ -585,19 +587,14 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { return s; } -void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s, - grpc_closure *shutdown_complete) { - s->shutdown_complete = shutdown_complete; -} - void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { if (exec_ctx == NULL) { grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp_server_destroy(&local_exec_ctx, s); + tcp_server_destroy(&local_exec_ctx, s); grpc_exec_ctx_finish(&local_exec_ctx); } else { - grpc_tcp_server_destroy(exec_ctx, s); + tcp_server_destroy(exec_ctx, s); } } } diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 0435f5005bb..531fb003f7a 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -55,6 +55,7 @@ #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 /* one listening port */ +typedef struct grpc_tcp_listener grpc_tcp_listener; struct grpc_tcp_listener { /* This seemingly magic number comes from AcceptEx's documentation. each address buffer needs to have at least 16 more bytes at their end. */ @@ -134,8 +135,7 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { return s; } -static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, - grpc_tcp_server *s) { +static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { int immediately_done = 0; grpc_tcp_listener *sp; gpr_mu_lock(&s->mu); @@ -156,19 +156,14 @@ static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, } } -void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s, - grpc_closure *shutdown_complete) { - s->shutdown_complete = shutdown_complete; -} - void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { if (exec_ctx == NULL) { grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp_server_destroy(&local_exec_ctx, s); + tcp_server_destroy(&local_exec_ctx, s); grpc_exec_ctx_finish(&local_exec_ctx); } else { - grpc_tcp_server_destroy(exec_ctx, s); + tcp_server_destroy(exec_ctx, s); } } } @@ -300,6 +295,7 @@ failure: /* Event manager callback when reads are ready. */ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { grpc_tcp_listener *sp = arg; + grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0}; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; @@ -367,7 +363,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, - sp->server, sp->port_index, 0); + &acceptor); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next @@ -495,7 +491,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, } } -unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, int port_index) { +unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, int port_index) { grpc_tcp_listener *sp; for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index) ; @@ -506,8 +502,8 @@ unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, int port_index) { } } -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index, - unsigned fd_index) { +int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, + unsigned fd_index) { grpc_tcp_listener *sp; if (fd_index != 0) { /* Windows implementation has only one fd per port_index. */ diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index f4f3ff49218..53c8ae4c802 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -127,9 +127,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, } static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, - grpc_tcp_server *from_server, unsigned port_index, - unsigned fd_index) { - grpc_tcp_server_unref(NULL, from_server); + grpc_tcp_server_acceptor *acceptor) { grpc_server_secure_state *state = statep; state_ref(state); grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp, diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index f0b9f211b3d..cdd6d4fd969 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -53,8 +53,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *server, } static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp, grpc_tcp_server *tcp_server, - unsigned port_index, unsigned fd_index) { + grpc_endpoint *tcp, + grpc_tcp_server_acceptor *acceptor) { /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -66,7 +66,6 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, exec_ctx, grpc_server_get_channel_args(server), tcp, 0); setup_transport(exec_ctx, server, transport); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); - grpc_tcp_server_unref(exec_ctx, tcp_server); } /* Server callback: start listening on our ports */ @@ -82,8 +81,8 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_set_shutdown_complete(tcp, destroy_done); grpc_tcp_server_unref(exec_ctx, tcp); + grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 68832a4b4e5..bf7ef3f26be 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -79,10 +79,8 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { } static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, - grpc_tcp_server *tcp_server, unsigned port_index, - unsigned fd_index) { + grpc_tcp_server_acceptor *acceptor) { test_tcp_server *server = arg; - grpc_tcp_server_unref(NULL, tcp_server); grpc_closure_init(&on_read, handle_read, NULL); gpr_slice_buffer_init(&state.incoming_buffer); gpr_slice_buffer_init(&state.temp_incoming_buffer); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 2791087b8a0..11848851d80 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -53,18 +53,17 @@ struct on_connect_result { }; static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, - grpc_tcp_server *tcp_server, unsigned port_index, - unsigned fd_index) { + grpc_tcp_server_acceptor *acceptor) { struct on_connect_result *result = arg; grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - result->server_fd = grpc_tcp_server_get_fd(tcp_server, port_index, fd_index); + result->server_fd = grpc_tcp_server_port_fd( + acceptor->from_server, acceptor->port_index, acceptor->fd_index); g_nconnects++; grpc_pollset_kick(&g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_tcp_server_unref(exec_ctx, tcp_server); } static void test_no_op(void) { @@ -133,14 +132,14 @@ static void test_connect(int n) { addr.ss_family = AF_INET; GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len) > 0); - GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 2) == 0); - GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 1) == 0); - GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 0) == 1); - GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 1) < 0); - GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 2) < 0); - GPR_ASSERT(grpc_tcp_server_get_fd(s, 2, 0) < 0); - GPR_ASSERT(grpc_tcp_server_get_fd(s, 1, 0) < 0); - svrfd = grpc_tcp_server_get_fd(s, 0, 0); + GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 2) == 0); + GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 1) == 0); + GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 0) == 1); + GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 1) < 0); + GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 2) < 0); + GPR_ASSERT(grpc_tcp_server_port_fd(s, 2, 0) < 0); + GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 0) < 0); + svrfd = grpc_tcp_server_port_fd(s, 0, 0); GPR_ASSERT(svrfd >= 0); GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index 0db25703c4a..fa51d810d87 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -67,14 +67,12 @@ static void pretty_print_backoffs(reconnect_server *server) { } static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, - grpc_tcp_server *tcp_server, unsigned port_index, - unsigned fd_index) { + grpc_tcp_server_acceptor *acceptor) { char *peer; char *last_colon; reconnect_server *server = (reconnect_server *)arg; gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); timestamp_list *new_tail; - grpc_tcp_server_unref(NULL, tcp_server); peer = grpc_endpoint_get_peer(tcp); grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index cd7cd8db6af..52300c1a2d2 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -45,10 +45,17 @@ #include "src/core/iomgr/tcp_server.h" #include "test/core/util/port.h" +static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data, + int success) { + test_tcp_server *server = data; + server->shutdown = 1; +} + void test_tcp_server_init(test_tcp_server *server, grpc_tcp_server_cb on_connect, void *user_data) { grpc_init(); server->tcp_server = NULL; + grpc_closure_init(&server->shutdown_complete, on_server_destroyed, server); server->shutdown = 0; grpc_pollset_init(&server->pollset); server->pollsets[0] = &server->pollset; @@ -65,7 +72,7 @@ void test_tcp_server_start(test_tcp_server *server, int port) { addr.sin_port = htons((uint16_t)port); memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); - server->tcp_server = grpc_tcp_server_create(NULL); + server->tcp_server = grpc_tcp_server_create(&server->shutdown_complete); port_added = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); GPR_ASSERT(port_added == port); @@ -90,24 +97,14 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) { grpc_exec_ctx_finish(&exec_ctx); } -static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data, - int success) { - test_tcp_server *server = data; - server->shutdown = 1; -} - static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {} void test_tcp_server_destroy(test_tcp_server *server) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_timespec shutdown_deadline; - grpc_closure server_shutdown_cb; grpc_closure do_nothing_cb; - grpc_closure_init(&server_shutdown_cb, on_server_destroyed, server); - grpc_closure_init(&do_nothing_cb, do_nothing, NULL); - grpc_tcp_server_set_shutdown_complete(server->tcp_server, - &server_shutdown_cb); grpc_tcp_server_unref(&exec_ctx, server->tcp_server); + grpc_closure_init(&do_nothing_cb, do_nothing, NULL); shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(5, GPR_TIMESPAN)); while (!server->shutdown && diff --git a/test/core/util/test_tcp_server.h b/test/core/util/test_tcp_server.h index deb65eef119..5299a8f3064 100644 --- a/test/core/util/test_tcp_server.h +++ b/test/core/util/test_tcp_server.h @@ -39,6 +39,7 @@ typedef struct test_tcp_server { grpc_tcp_server *tcp_server; + grpc_closure shutdown_complete; int shutdown; grpc_pollset pollset; grpc_pollset *pollsets[1]; diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 0de20a634a4..206848bf409 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -114,10 +114,13 @@ class ValgrindConfig(object): self.args = args self.allow_hashing = False - def job_spec(self, cmdline, hash_targets): + def job_spec(self, cmdline, hash_targets, timeout_seconds=None, + shortname=None, environ=None): + if shortname is None: + shortname = 'valgrind %s' % cmdline[0] return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool] + self.args + cmdline, - shortname='valgrind %s' % cmdline[0], + shortname=shortname, hash_targets=None, flake_retries=5 if args.allow_flakes else 0, timeout_retries=3 if args.allow_flakes else 0) @@ -1092,4 +1095,3 @@ else: if BuildAndRunError.POST_TEST in errors: exit_code |= 4 sys.exit(exit_code) - From 03c8a9b365a4a819617909e16e9c94632de5a36d Mon Sep 17 00:00:00 2001 From: Dan Born Date: Tue, 12 Jan 2016 21:14:29 -0800 Subject: [PATCH 03/17] Put the fd_index in the same order as the sibling list. --- src/core/iomgr/tcp_server_posix.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 3e0c5be3483..e37da136af5 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -509,11 +509,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, addr_len = sizeof(addr4_copy); } sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); - if (sp2 != NULL) { - if (sp != NULL) { - sp->sibling = sp2; - } - sp2->is_sibling = 1; + if (sp2 != NULL && sp != NULL) { + sp2->sibling = sp; + sp->is_sibling = 1; } done: From 725ee28af8924670a0a9974094b5ea773c519419 Mon Sep 17 00:00:00 2001 From: Dan Born Date: Wed, 13 Jan 2016 13:14:56 -0800 Subject: [PATCH 04/17] Extend Copyrights to 2016 --- src/core/iomgr/tcp_server.h | 2 +- src/core/iomgr/tcp_server_posix.c | 2 +- src/core/iomgr/tcp_server_windows.c | 2 +- src/core/security/server_secure_chttp2.c | 2 +- src/core/surface/server_chttp2.c | 2 +- test/core/client_config/set_initial_connect_string_test.c | 2 +- test/core/iomgr/tcp_server_posix_test.c | 2 +- test/core/util/reconnect_server.c | 2 +- test/core/util/test_tcp_server.c | 2 +- test/core/util/test_tcp_server.h | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index e4fd0999a9f..5199c25eed5 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index e37da136af5..6a22c08edba 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 531fb003f7a..9a831722df0 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 53c8ae4c802..08713fceaf8 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index cdd6d4fd969..6e21d2dcd77 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index bf7ef3f26be..33cab715b2a 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 11848851d80..f1c7cfd02fe 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index fa51d810d87..57225aa8a33 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index 52300c1a2d2..aaba7be3569 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/core/util/test_tcp_server.h b/test/core/util/test_tcp_server.h index 5299a8f3064..51119cf6c80 100644 --- a/test/core/util/test_tcp_server.h +++ b/test/core/util/test_tcp_server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without From 50ef3042a16a0db47449840b25a0a0cc837d8d78 Mon Sep 17 00:00:00 2001 From: Dan Born Date: Wed, 13 Jan 2016 13:35:33 -0800 Subject: [PATCH 05/17] Fix windows compilation errors --- src/core/iomgr/tcp_server_windows.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 9a831722df0..aad2e5f358a 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -125,7 +125,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { s->head = sp->next; sp->next = NULL; grpc_winsocket_destroy(sp->socket); - grpc_tcp_listener_unref(sp); + gpr_free(sp); } gpr_free(s); } @@ -417,7 +417,6 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->new_socket = INVALID_SOCKET; sp->port = port; sp->port_index = port_index; - gpr_ref_init(&sp->refs, 1); grpc_closure_init(&sp->on_accept, on_accept, sp); GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); @@ -491,7 +490,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, } } -unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, int port_index) { +unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, + unsigned port_index) { grpc_tcp_listener *sp; for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index) ; From 601acadc91f0b469af0cc763824381a202d73059 Mon Sep 17 00:00:00 2001 From: Dan Born Date: Wed, 13 Jan 2016 19:52:58 -0800 Subject: [PATCH 06/17] Test a server with multiple ports --- test/core/iomgr/tcp_server_posix_test.c | 177 +++++++++++++++++------- 1 file changed, 127 insertions(+), 50 deletions(-) diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index f1c7cfd02fe..b336002429a 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -33,11 +33,13 @@ #include "src/core/iomgr/tcp_server.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/sockaddr_utils.h" #include #include #include #include "test/core/util/test_config.h" +#include #include #include #include @@ -48,18 +50,23 @@ static grpc_pollset g_pollset; static int g_nconnects = 0; -struct on_connect_result { +typedef struct on_connect_result { + unsigned port_index; + unsigned fd_index; int server_fd; -}; +} on_connect_result; + +static on_connect_result g_result = {0, 0, -1}; static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_tcp_server_acceptor *acceptor) { - struct on_connect_result *result = arg; grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - result->server_fd = grpc_tcp_server_port_fd( + g_result.port_index = acceptor->port_index; + g_result.fd_index = acceptor->fd_index; + g_result.server_fd = grpc_tcp_server_port_fd( acceptor->from_server, acceptor->port_index, acceptor->fd_index); g_nconnects++; grpc_pollset_kick(&g_pollset, NULL); @@ -114,68 +121,138 @@ static void test_no_op_with_port_and_start(void) { grpc_exec_ctx_finish(&exec_ctx); } -static void test_connect(int n) { +static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, + socklen_t remote_len, on_connect_result *result) { + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); + int clifd = socket(remote->sa_family, SOCK_STREAM, 0); + int nconnects_before; + + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + nconnects_before = g_nconnects; + g_result.server_fd = -1; + GPR_ASSERT(clifd >= 0); + gpr_log(GPR_DEBUG, "start connect"); + GPR_ASSERT(connect(clifd, remote, remote_len) == 0); + gpr_log(GPR_DEBUG, "wait"); + while (g_nconnects == nconnects_before && + gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { + grpc_pollset_worker worker; + grpc_pollset_work(exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + } + gpr_log(GPR_DEBUG, "wait done"); + GPR_ASSERT(g_nconnects == nconnects_before + 1); + close(clifd); + *result = g_result; + + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); +} + +/* grpc_tcp_server_add_port() will not pick a second port randomly. We must find + one by binding one randomly, closing the socket, and returning the port + number. This might fail because of races. */ +static int unused_port() { + int s = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_storage addr; + socklen_t addr_len = sizeof(addr); + GPR_ASSERT(s >= 0); + memset(&addr, 0, sizeof(addr)); + addr.ss_family = AF_INET; + if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) { + gpr_log(GPR_ERROR, "bind: %s", strerror(errno)); + abort(); + } + GPR_ASSERT(getsockname(s, (struct sockaddr *)&addr, &addr_len) == 0); + GPR_ASSERT(addr_len <= sizeof(addr)); + close(s); + return ntohs(((struct sockaddr_in *)&addr)->sin_port); +} + +/* Tests a tcp server with multiple ports. TODO(daniel-j-born): Multiple fds for + the same port should be tested. */ +static void test_connect(unsigned n) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_storage addr; + struct sockaddr_storage addr1; socklen_t addr_len = sizeof(addr); - int svrfd, clifd; + unsigned svr_fd_count; + int svr_port; + unsigned svr1_fd_count; + int svr1_port; grpc_tcp_server *s = grpc_tcp_server_create(NULL); - int nconnects_before; - gpr_timespec deadline; grpc_pollset *pollsets[1]; - int i; - struct on_connect_result result; + unsigned i; LOG_TEST("test_connect"); gpr_log(GPR_INFO, "clients=%d", n); - memset(&addr, 0, sizeof(addr)); - addr.ss_family = AF_INET; - GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len) > - 0); + memset(&addr1, 0, sizeof(addr1)); + addr.ss_family = addr1.ss_family = AF_INET; + svr_port = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len); + GPR_ASSERT(svr_port > 0); + svr1_port = unused_port(); + grpc_sockaddr_set_port((struct sockaddr *)&addr1, svr1_port); + GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr1, addr_len) == + svr1_port); + + /* Bad port_index. */ GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 2) == 0); - GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 1) == 0); - GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 0) == 1); - GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 1) < 0); - GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 2) < 0); GPR_ASSERT(grpc_tcp_server_port_fd(s, 2, 0) < 0); - GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 0) < 0); - svrfd = grpc_tcp_server_port_fd(s, 0, 0); - GPR_ASSERT(svrfd >= 0); - GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); - GPR_ASSERT(addr_len <= sizeof(addr)); - pollsets[0] = &g_pollset; - grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, &result); + /* Bad fd_index. */ + GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 100) < 0); + GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 100) < 0); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + /* Got at least one fd per port. */ + svr_fd_count = grpc_tcp_server_port_fd_count(s, 0); + GPR_ASSERT(svr_fd_count >= 1); + svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1); + GPR_ASSERT(svr1_fd_count >= 1); - for (i = 0; i < n; i++) { - deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); - nconnects_before = g_nconnects; - result.server_fd = -1; - clifd = socket(addr.ss_family, SOCK_STREAM, 0); - GPR_ASSERT(clifd >= 0); - gpr_log(GPR_DEBUG, "start connect"); - GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr, addr_len) == 0); - - gpr_log(GPR_DEBUG, "wait"); - while (g_nconnects == nconnects_before && - gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + for (i = 0; i < svr_fd_count; ++i) { + int fd = grpc_tcp_server_port_fd(s, 0, i); + GPR_ASSERT(fd >= 0); + if (i == 0) { + GPR_ASSERT(getsockname(fd, (struct sockaddr *)&addr, &addr_len) == 0); + GPR_ASSERT(addr_len <= sizeof(addr)); + } + } + for (i = 0; i < svr1_fd_count; ++i) { + int fd = grpc_tcp_server_port_fd(s, 1, i); + GPR_ASSERT(fd >= 0); + if (i == 0) { + GPR_ASSERT(getsockname(fd, (struct sockaddr *)&addr1, &addr_len) == 0); + GPR_ASSERT(addr_len <= sizeof(addr1)); } - gpr_log(GPR_DEBUG, "wait done"); - - GPR_ASSERT(g_nconnects == nconnects_before + 1); - close(clifd); - GPR_ASSERT(svrfd == result.server_fd); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + pollsets[0] = &g_pollset; + grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); + + for (i = 0; i < n; i++) { + on_connect_result result = {0, 0, -1}; + int svr_fd; + tcp_connect(&exec_ctx, (struct sockaddr *)&addr, addr_len, &result); + GPR_ASSERT(result.server_fd >= 0); + svr_fd = result.server_fd; + GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) + == result.server_fd); + GPR_ASSERT(result.port_index == 0); + GPR_ASSERT(result.fd_index < svr_fd_count); + + result.port_index = 0; + result.fd_index = 0; + result.server_fd = -1; + tcp_connect(&exec_ctx, (struct sockaddr *)&addr1, addr_len, &result); + GPR_ASSERT(result.server_fd >= 0); + GPR_ASSERT(result.server_fd != svr_fd); + GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) + == result.server_fd); + GPR_ASSERT(result.port_index == 1); + GPR_ASSERT(result.fd_index < svr_fd_count); + } grpc_tcp_server_unref(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); From a78ca38e108610672b38cf0ecb0507d167ee25ce Mon Sep 17 00:00:00 2001 From: Dan Born Date: Thu, 14 Jan 2016 13:02:09 -0800 Subject: [PATCH 07/17] clang reformat. --- src/core/iomgr/tcp_server_windows.c | 5 +++-- test/core/iomgr/tcp_server_posix_test.c | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index aad2e5f358a..46617a6c652 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -362,8 +362,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ - if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, - &acceptor); + if (ep) + sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, + &acceptor); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index b336002429a..44874a7ad51 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -237,8 +237,8 @@ static void test_connect(unsigned n) { tcp_connect(&exec_ctx, (struct sockaddr *)&addr, addr_len, &result); GPR_ASSERT(result.server_fd >= 0); svr_fd = result.server_fd; - GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) - == result.server_fd); + GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) == + result.server_fd); GPR_ASSERT(result.port_index == 0); GPR_ASSERT(result.fd_index < svr_fd_count); @@ -248,8 +248,8 @@ static void test_connect(unsigned n) { tcp_connect(&exec_ctx, (struct sockaddr *)&addr1, addr_len, &result); GPR_ASSERT(result.server_fd >= 0); GPR_ASSERT(result.server_fd != svr_fd); - GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) - == result.server_fd); + GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) == + result.server_fd); GPR_ASSERT(result.port_index == 1); GPR_ASSERT(result.fd_index < svr_fd_count); } From d890a217da23eb141f38ba1c15e404d634a7bcb1 Mon Sep 17 00:00:00 2001 From: Dan Born Date: Thu, 14 Jan 2016 15:00:21 -0800 Subject: [PATCH 08/17] Fix port allocation in test. --- test/core/iomgr/tcp_server_posix_test.c | 30 +++++-------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 44874a7ad51..564693367d3 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -37,6 +37,7 @@ #include #include #include +#include "test/core/util/port.h" #include "test/core/util/test_config.h" #include @@ -151,29 +152,9 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -/* grpc_tcp_server_add_port() will not pick a second port randomly. We must find - one by binding one randomly, closing the socket, and returning the port - number. This might fail because of races. */ -static int unused_port() { - int s = socket(AF_INET, SOCK_STREAM, 0); - struct sockaddr_storage addr; - socklen_t addr_len = sizeof(addr); - GPR_ASSERT(s >= 0); - memset(&addr, 0, sizeof(addr)); - addr.ss_family = AF_INET; - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) { - gpr_log(GPR_ERROR, "bind: %s", strerror(errno)); - abort(); - } - GPR_ASSERT(getsockname(s, (struct sockaddr *)&addr, &addr_len) == 0); - GPR_ASSERT(addr_len <= sizeof(addr)); - close(s); - return ntohs(((struct sockaddr_in *)&addr)->sin_port); -} - /* Tests a tcp server with multiple ports. TODO(daniel-j-born): Multiple fds for the same port should be tested. */ -static void test_connect(unsigned n) { +static void test_connect(int svr1_port, unsigned n) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_storage addr; struct sockaddr_storage addr1; @@ -181,7 +162,6 @@ static void test_connect(unsigned n) { unsigned svr_fd_count; int svr_port; unsigned svr1_fd_count; - int svr1_port; grpc_tcp_server *s = grpc_tcp_server_create(NULL); grpc_pollset *pollsets[1]; unsigned i; @@ -192,7 +172,6 @@ static void test_connect(unsigned n) { addr.ss_family = addr1.ss_family = AF_INET; svr_port = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len); GPR_ASSERT(svr_port > 0); - svr1_port = unused_port(); grpc_sockaddr_set_port((struct sockaddr *)&addr1, svr1_port); GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr1, addr_len) == svr1_port); @@ -265,6 +244,7 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { int main(int argc, char **argv) { grpc_closure destroyed; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + int svr1_port = grpc_pick_unused_port_or_die(); grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); @@ -273,8 +253,8 @@ int main(int argc, char **argv) { test_no_op_with_start(); test_no_op_with_port(); test_no_op_with_port_and_start(); - test_connect(1); - test_connect(10); + test_connect(svr1_port, 1); + test_connect(svr1_port, 10); grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); From b13a69da41ddad7880f409c53d1f55982ee79ac5 Mon Sep 17 00:00:00 2001 From: Dan Born Date: Thu, 14 Jan 2016 16:53:18 -0800 Subject: [PATCH 09/17] tcp_server_posix_test fixes. Use grpc_shutdown() instead of grpc_iomgr_shutdown() to prevent grpc_pick_unused_port_or_die() from inappropriately destroying global state. Fix port allocation issues. --- test/core/iomgr/tcp_server_posix_test.c | 54 +++++++++++++++++-------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 564693367d3..0f92166ab26 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -34,6 +34,7 @@ #include "src/core/iomgr/tcp_server.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/sockaddr_utils.h" +#include #include #include #include @@ -52,12 +53,30 @@ static grpc_pollset g_pollset; static int g_nconnects = 0; typedef struct on_connect_result { + /* Owns a ref to server. */ + grpc_tcp_server *server; unsigned port_index; unsigned fd_index; int server_fd; } on_connect_result; -static on_connect_result g_result = {0, 0, -1}; +void on_connect_result_init(on_connect_result *result) { + result->server = NULL; + result->port_index = 0; + result->fd_index = 0; + result->server_fd = -1; +} + +void on_connect_result_set(on_connect_result *result, + const grpc_tcp_server_acceptor *acceptor) { + result->server = grpc_tcp_server_ref(acceptor->from_server); + result->port_index = acceptor->port_index; + result->fd_index = acceptor->fd_index; + result->server_fd = grpc_tcp_server_port_fd( + result->server, acceptor->port_index, acceptor->fd_index); +} + +static on_connect_result g_result = {NULL, 0, 0, -1}; static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_tcp_server_acceptor *acceptor) { @@ -65,10 +84,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_endpoint_destroy(exec_ctx, tcp); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - g_result.port_index = acceptor->port_index; - g_result.fd_index = acceptor->fd_index; - g_result.server_fd = grpc_tcp_server_port_fd( - acceptor->from_server, acceptor->port_index, acceptor->fd_index); + on_connect_result_set(&g_result, acceptor); g_nconnects++; grpc_pollset_kick(&g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -130,7 +146,7 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); nconnects_before = g_nconnects; - g_result.server_fd = -1; + on_connect_result_init(&g_result); GPR_ASSERT(clifd >= 0); gpr_log(GPR_DEBUG, "start connect"); GPR_ASSERT(connect(clifd, remote, remote_len) == 0); @@ -154,7 +170,7 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, /* Tests a tcp server with multiple ports. TODO(daniel-j-born): Multiple fds for the same port should be tested. */ -static void test_connect(int svr1_port, unsigned n) { +static void test_connect(unsigned n) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_storage addr; struct sockaddr_storage addr1; @@ -162,6 +178,7 @@ static void test_connect(int svr1_port, unsigned n) { unsigned svr_fd_count; int svr_port; unsigned svr1_fd_count; + int svr1_port; grpc_tcp_server *s = grpc_tcp_server_create(NULL); grpc_pollset *pollsets[1]; unsigned i; @@ -172,6 +189,9 @@ static void test_connect(int svr1_port, unsigned n) { addr.ss_family = addr1.ss_family = AF_INET; svr_port = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len); GPR_ASSERT(svr_port > 0); + /* Cannot use wildcard (port==0), because add_port() will try to reuse the + same port as a previous add_port(). */ + svr1_port = grpc_pick_unused_port_or_die(); grpc_sockaddr_set_port((struct sockaddr *)&addr1, svr1_port); GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr1, addr_len) == svr1_port); @@ -211,8 +231,9 @@ static void test_connect(int svr1_port, unsigned n) { grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); for (i = 0; i < n; i++) { - on_connect_result result = {0, 0, -1}; + on_connect_result result; int svr_fd; + on_connect_result_init(&result); tcp_connect(&exec_ctx, (struct sockaddr *)&addr, addr_len, &result); GPR_ASSERT(result.server_fd >= 0); svr_fd = result.server_fd; @@ -220,10 +241,10 @@ static void test_connect(int svr1_port, unsigned n) { result.server_fd); GPR_ASSERT(result.port_index == 0); GPR_ASSERT(result.fd_index < svr_fd_count); + GPR_ASSERT(result.server == s); + grpc_tcp_server_unref(&exec_ctx, result.server); - result.port_index = 0; - result.fd_index = 0; - result.server_fd = -1; + on_connect_result_init(&result); tcp_connect(&exec_ctx, (struct sockaddr *)&addr1, addr_len, &result); GPR_ASSERT(result.server_fd >= 0); GPR_ASSERT(result.server_fd != svr_fd); @@ -231,6 +252,8 @@ static void test_connect(int svr1_port, unsigned n) { result.server_fd); GPR_ASSERT(result.port_index == 1); GPR_ASSERT(result.fd_index < svr_fd_count); + GPR_ASSERT(result.server == s); + grpc_tcp_server_unref(&exec_ctx, result.server); } grpc_tcp_server_unref(&exec_ctx, s); @@ -244,21 +267,20 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { int main(int argc, char **argv) { grpc_closure destroyed; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - int svr1_port = grpc_pick_unused_port_or_die(); grpc_test_init(argc, argv); - grpc_iomgr_init(); + grpc_init(); grpc_pollset_init(&g_pollset); test_no_op(); test_no_op_with_start(); test_no_op_with_port(); test_no_op_with_port_and_start(); - test_connect(svr1_port, 1); - test_connect(svr1_port, 10); + test_connect(1); + test_connect(10); grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); - grpc_iomgr_shutdown(); + grpc_shutdown(); return 0; } From 9c12bc252e7bb8cdfe051884862e17d4850f3ab0 Mon Sep 17 00:00:00 2001 From: Dan Born Date: Wed, 13 Jan 2016 16:52:20 -0800 Subject: [PATCH 10/17] Add shutdown_starting callbacks to tcp_server. tcp_server_posix_test illustrates how this can be used to implement a weak referencing mechanism. --- src/core/iomgr/tcp_server.h | 7 ++++ src/core/iomgr/tcp_server_posix.c | 20 +++++++++- src/core/iomgr/tcp_server_windows.c | 20 +++++++++- test/core/iomgr/tcp_server_posix_test.c | 52 +++++++++++++++++++++++-- 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 5199c25eed5..8f3184ff1e0 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -34,6 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H #define GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H +#include "src/core/iomgr/closure.h" #include "src/core/iomgr/endpoint.h" /* Forward decl of grpc_tcp_server */ @@ -90,6 +91,12 @@ int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, /* Ref s and return s. */ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s); +/* shutdown_starting is called when ref count has reached zero and the server is + about to be destroyed. The server will be deleted after it returns. Calling + grpc_tcp_server_ref() from it has no effect. */ +void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, + grpc_closure *shutdown_starting); + /* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue a call (exec_ctx!=NULL) to shutdown_complete. */ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 6a22c08edba..adf14aeb597 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -128,6 +128,9 @@ struct grpc_tcp_server { grpc_tcp_listener *tail; unsigned nports; + /* List of closures passed to shutdown_starting_add(). */ + grpc_closure_list shutdown_starting; + /* shutdown callback */ grpc_closure *shutdown_complete; @@ -144,6 +147,8 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; + s->shutdown_starting.head = NULL; + s->shutdown_starting.tail = NULL; s->shutdown_complete = shutdown_complete; s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; @@ -585,13 +590,26 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { return s; } +void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, + grpc_closure *shutdown_starting) { + gpr_mu_lock(&s->mu); + grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1); + gpr_mu_unlock(&s->mu); +} + void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { + /* Complete shutdown_starting work before destroying. */ + grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_lock(&s->mu); + grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting); + gpr_mu_unlock(&s->mu); if (exec_ctx == NULL) { - grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_flush(&local_exec_ctx); tcp_server_destroy(&local_exec_ctx, s); grpc_exec_ctx_finish(&local_exec_ctx); } else { + grpc_exec_ctx_finish(&local_exec_ctx); tcp_server_destroy(exec_ctx, s); } } diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 46617a6c652..8ee8149f251 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -93,6 +93,9 @@ struct grpc_tcp_server { grpc_tcp_listener *head; grpc_tcp_listener *tail; + /* List of closures passed to shutdown_starting_add(). */ + grpc_closure_list shutdown_starting; + /* shutdown callback */ grpc_closure *shutdown_complete; }; @@ -108,6 +111,8 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { s->on_accept_cb_arg = NULL; s->head = NULL; s->tail = NULL; + s->shutdown_starting.head = NULL; + s->shutdown_starting.tail = NULL; s->shutdown_complete = shutdown_complete; return s; } @@ -135,6 +140,13 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { return s; } +void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, + grpc_closure *shutdown_starting) { + gpr_mu_lock(&s->mu); + grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1); + gpr_mu_unlock(&s->mu); +} + static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { int immediately_done = 0; grpc_tcp_listener *sp; @@ -158,11 +170,17 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { + /* Complete shutdown_starting work before destroying. */ + grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_lock(&s->mu); + grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting); + gpr_mu_unlock(&s->mu); if (exec_ctx == NULL) { - grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_flush(&local_exec_ctx); tcp_server_destroy(&local_exec_ctx, s); grpc_exec_ctx_finish(&local_exec_ctx); } else { + grpc_exec_ctx_finish(&local_exec_ctx); tcp_server_destroy(exec_ctx, s); } } diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 0f92166ab26..23b368df3af 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -60,15 +60,24 @@ typedef struct on_connect_result { int server_fd; } on_connect_result; -void on_connect_result_init(on_connect_result *result) { +typedef struct server_weak_ref { + grpc_tcp_server *server; + + /* arg is this server_weak_ref. */ + grpc_closure server_shutdown; +} server_weak_ref; + +static on_connect_result g_result = {NULL, 0, 0, -1}; + +static void on_connect_result_init(on_connect_result *result) { result->server = NULL; result->port_index = 0; result->fd_index = 0; result->server_fd = -1; } -void on_connect_result_set(on_connect_result *result, - const grpc_tcp_server_acceptor *acceptor) { +static void on_connect_result_set(on_connect_result *result, + const grpc_tcp_server_acceptor *acceptor) { result->server = grpc_tcp_server_ref(acceptor->from_server); result->port_index = acceptor->port_index; result->fd_index = acceptor->fd_index; @@ -76,7 +85,29 @@ void on_connect_result_set(on_connect_result *result, result->server, acceptor->port_index, acceptor->fd_index); } -static on_connect_result g_result = {NULL, 0, 0, -1}; + +static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg, + int success) { + server_weak_ref *weak_ref = arg; + weak_ref->server = NULL; +} + +static void server_weak_ref_init(server_weak_ref *weak_ref) { + weak_ref->server = NULL; + grpc_closure_init(&weak_ref->server_shutdown, server_weak_ref_shutdown, + weak_ref); +} + +/* Make weak_ref->server_shutdown a shutdown_starting cb on server. + grpc_tcp_server promises that the server object will live until + weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server + should be held until server_weak_ref_set() returns to avoid a race where the + server is deleted before the shutdown_starting cb is added. */ +static void server_weak_ref_set(server_weak_ref *weak_ref, + grpc_tcp_server *server) { + grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown); + weak_ref->server = server; +} static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_tcp_server_acceptor *acceptor) { @@ -182,6 +213,8 @@ static void test_connect(unsigned n) { grpc_tcp_server *s = grpc_tcp_server_create(NULL); grpc_pollset *pollsets[1]; unsigned i; + server_weak_ref weak_ref; + server_weak_ref_init(&weak_ref); LOG_TEST("test_connect"); gpr_log(GPR_INFO, "clients=%d", n); memset(&addr, 0, sizeof(addr)); @@ -242,6 +275,9 @@ static void test_connect(unsigned n) { GPR_ASSERT(result.port_index == 0); GPR_ASSERT(result.fd_index < svr_fd_count); GPR_ASSERT(result.server == s); + if (weak_ref.server == NULL) { + server_weak_ref_set(&weak_ref, result.server); + } grpc_tcp_server_unref(&exec_ctx, result.server); on_connect_result_init(&result); @@ -256,7 +292,15 @@ static void test_connect(unsigned n) { grpc_tcp_server_unref(&exec_ctx, result.server); } + /* Weak ref to server valid until final unref. */ + GPR_ASSERT(weak_ref.server != NULL); + GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0); + grpc_tcp_server_unref(&exec_ctx, s); + + /* Weak ref lost. */ + GPR_ASSERT(weak_ref.server == NULL); + grpc_exec_ctx_finish(&exec_ctx); } From 36b24109a42178d894b8c80f005bcb551c4986fb Mon Sep 17 00:00:00 2001 From: Dan Born Date: Fri, 15 Jan 2016 17:52:58 -0800 Subject: [PATCH 11/17] clang reformat --- test/core/iomgr/tcp_server_posix_test.c | 1 - 1 file changed, 1 deletion(-) diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 23b368df3af..f7097ac904d 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -85,7 +85,6 @@ static void on_connect_result_set(on_connect_result *result, result->server, acceptor->port_index, acceptor->fd_index); } - static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg, int success) { server_weak_ref *weak_ref = arg; From 25fe36ac83f33755c3062a365cd1b80235b3617f Mon Sep 17 00:00:00 2001 From: Dan Born Date: Thu, 21 Jan 2016 21:33:55 -0800 Subject: [PATCH 12/17] Revert run_tests.py to master. --- tools/run_tests/run_tests.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index e86467e3f23..637aff8585b 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -115,13 +115,10 @@ class ValgrindConfig(object): self.args = args self.allow_hashing = False - def job_spec(self, cmdline, hash_targets, cpu_cost=1.0, timeout_seconds=None, - shortname=None, environ=None): - if shortname is None: - shortname = 'valgrind %s' % cmdline[0] + def job_spec(self, cmdline, hash_targets, cpu_cost=1.0): return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool] + self.args + cmdline, - shortname=shortname, + shortname='valgrind %s' % cmdline[0], hash_targets=None, cpu_cost=cpu_cost, flake_retries=5 if args.allow_flakes else 0, From 72f9a67cea8ea075b4f7b553845c6292e5aacf8b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 22 Jan 2016 14:27:58 -0800 Subject: [PATCH 13/17] Fix path where we check for ruby coverage report --- tools/run_tests/post_tests_ruby.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run_tests/post_tests_ruby.sh b/tools/run_tests/post_tests_ruby.sh index 66a9fbc5348..0877e44805a 100755 --- a/tools/run_tests/post_tests_ruby.sh +++ b/tools/run_tests/post_tests_ruby.sh @@ -43,4 +43,4 @@ genhtml $tmp2 --output-directory $out rm $tmp2 rm $tmp1 -cp -rv $root/src/ruby/coverage $root/reports/ruby +cp -rv $root/coverage $root/reports/ruby From 38b06fbc3519352f97ad8ea6fb817e80b93e7451 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 22 Jan 2016 14:58:24 -0800 Subject: [PATCH 14/17] make sure we embed openssl and zlib in artifacts --- tools/run_tests/build_artifacts.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tools/run_tests/build_artifacts.py b/tools/run_tests/build_artifacts.py index 0fd5bc63f37..ff9dd4735ac 100755 --- a/tools/run_tests/build_artifacts.py +++ b/tools/run_tests/build_artifacts.py @@ -129,18 +129,19 @@ class CSharpExtArtifact: '/p:PlatformToolset=v120', '/p:Platform=%s' % msbuild_platform], shell=True) - if self.platform == 'linux': - environ = {'CONFIG': 'opt'} - return create_docker_jobspec(self.name, - 'tools/jenkins/grpc_artifact_linux_%s' % self.arch, - 'tools/run_tests/build_artifact_csharp.sh') else: - environ = {'CONFIG': 'opt'} - if self.platform == 'macos': + environ = {'CONFIG': 'opt', + 'EMBED_OPENSSL': 'true', + 'EMBED_ZLIB': 'true'} + if self.platform == 'linux': + return create_docker_jobspec(self.name, + 'tools/jenkins/grpc_artifact_linux_%s' % self.arch, + 'tools/run_tests/build_artifact_csharp.sh') + else: environ.update(macos_arch_env(self.arch)) - return create_jobspec(self.name, - ['tools/run_tests/build_artifact_csharp.sh'], - environ=environ) + return create_jobspec(self.name, + ['tools/run_tests/build_artifact_csharp.sh'], + environ=environ) def __str__(self): return self.name From 58a1dc2bb31f4bf3581131fba80497c40ddc947d Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Thu, 21 Jan 2016 14:23:55 -0800 Subject: [PATCH 15/17] Add compiler error diagnostics to Python setup --- PYTHON-MANIFEST.in | 1 + setup.py | 2 +- src/python/grpcio/commands.py | 10 +++- src/python/grpcio/support.py | 91 +++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 src/python/grpcio/support.py diff --git a/PYTHON-MANIFEST.in b/PYTHON-MANIFEST.in index 02bd9b52294..6e54e1b3c64 100644 --- a/PYTHON-MANIFEST.in +++ b/PYTHON-MANIFEST.in @@ -5,6 +5,7 @@ graft include/grpc graft third_party/boringssl include src/python/grpcio/commands.py include src/python/grpcio/grpc_core_dependencies.py +include src/python/grpcio/support.py include src/python/grpcio/README.rst include requirements.txt include etc/roots.pem diff --git a/setup.py b/setup.py index 63b56f35327..113e49ecfe5 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ BORINGSSL_INCLUDE = ('./third_party/boringssl/include',) # Ensure we're in the proper directory whether or not we're being used by pip. os.chdir(os.path.dirname(os.path.abspath(__file__))) -sys.path.insert(0, PYTHON_STEM) +sys.path.insert(0, os.path.abspath(PYTHON_STEM)) # Break import-style to ensure we can actually find our in-repo dependencies. import commands diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index bd12c5579cb..ff35c458617 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -37,9 +37,9 @@ import subprocess import sys import setuptools +from setuptools.command import build_ext from setuptools.command import build_py from setuptools.command import test -from setuptools.command import build_ext PYTHON_STEM = os.path.dirname(os.path.abspath(__file__)) @@ -186,7 +186,13 @@ class BuildExt(build_ext.build_ext): if compiler in BuildExt.LINK_OPTIONS: for extension in self.extensions: extension.extra_link_args += list(BuildExt.LINK_OPTIONS[compiler]) - build_ext.build_ext.build_extensions(self) + try: + build_ext.build_ext.build_extensions(self) + except KeyboardInterrupt: + raise + except Exception as error: + support.diagnose_build_ext_error(self, error) + raise CommandError("Failed `build_ext` step.") class Gather(setuptools.Command): diff --git a/src/python/grpcio/support.py b/src/python/grpcio/support.py new file mode 100644 index 00000000000..bbc509653d1 --- /dev/null +++ b/src/python/grpcio/support.py @@ -0,0 +1,91 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import os +import os.path +import shutil +import sys +import tempfile + +from distutils import errors + +import commands + + +C_PYTHON_DEV = """ +#include +int main(int argc, char **argv) { return 0; } +""" +C_PYTHON_DEV_ERROR_MESSAGE = """ +Could not find . This could mean the following: + * You're on Ubuntu and haven't `apt-get install`ed `python-dev`. + * You're on Mac OS X and the usual Python framework was somehow corrupted + (check your environment variables or try re-installing?) + * You're on Windows and your Python installation was somehow corrupted + (check your environment variables or try re-installing?) + * Note: Windows users should look into installing `vcpython27`. +""" + +C_CHECKS = { + C_PYTHON_DEV: C_PYTHON_DEV_ERROR_MESSAGE, +} + +def _compile(compiler, source_string): + tempdir = tempfile.mkdtemp() + cpath = os.path.join(tempdir, 'a.c') + with open(cpath, 'w') as cfile: + cfile.write(source_string) + try: + compiler.compile([cpath]) + except errors.CompileError as error: + return error + finally: + shutil.rmtree(tempdir) + +def _expect_compile(compiler, source_string, error_message): + if _compile(compiler, source_string) is not None: + sys.stderr.write(error_message) + raise commands.CommandError( + "Diagnostics found a compilation environment issue:\n{}" + .format(error_message)) + +def diagnose_build_ext_error(build_ext, error): + { + errors.CompileError: diagnose_compile_error + }[type(error)](build_ext, error) + +def diagnose_compile_error(build_ext, error): + """Attempt to run a few test files through the compiler to see if we can + diagnose the reason for the compile failure.""" + for c_check, message in C_CHECKS.items(): + _expect_compile(build_ext.compiler, c_check, message) + raise commands.CommandError( + "\n\nWe could not diagnose your build failure. Please file an issue at " + "http://www.github.com/grpc/grpc with `[Python install]` in the title.") From e399803234a4017b3ee11017b8e433cd38846c0f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 23 Jan 2016 11:50:16 -0800 Subject: [PATCH 16/17] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e0553ecc861..f894def4706 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [gRPC - An RPC library and framework](http://github.com/grpc/grpc) =================================== -Copyright 2015 Google Inc. +Copyright 2015-2016 Google Inc. #Documentation From 087543f670b2a725c6fb2236a8d7f8ca0fcc30ba Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 25 Jan 2016 11:38:25 -0800 Subject: [PATCH 17/17] Fix copyright --- tools/run_tests/post_tests_ruby.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run_tests/post_tests_ruby.sh b/tools/run_tests/post_tests_ruby.sh index 0877e44805a..1a02e566c76 100755 --- a/tools/run_tests/post_tests_ruby.sh +++ b/tools/run_tests/post_tests_ruby.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without