Code review follow-up.

reviewable/pr4680/r2
Dan Born 9 years ago
parent fa6b606898
commit 5d81d15260
  1. 26
      src/core/iomgr/tcp_server.h
  2. 23
      src/core/iomgr/tcp_server_posix.c
  3. 22
      src/core/iomgr/tcp_server_windows.c
  4. 4
      src/core/security/server_secure_chttp2.c
  5. 7
      src/core/surface/server_chttp2.c
  6. 4
      test/core/client_config/set_initial_connect_string_test.c
  7. 23
      test/core/iomgr/tcp_server_posix_test.c
  8. 4
      test/core/util/reconnect_server.c
  9. 21
      test/core/util/test_tcp_server.c
  10. 1
      test/core/util/test_tcp_server.h
  11. 8
      tools/run_tests/run_tests.py

@ -39,12 +39,20 @@
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
/* Called for newly connected TCP connections. Callee owns a ref on
from_server. */
typedef struct grpc_tcp_server_acceptor grpc_tcp_server_acceptor;
struct grpc_tcp_server_acceptor {
/* grpc_tcp_server_cb functions share a ref on from_server that is valid
until the function returns. */
grpc_tcp_server *from_server;
/* Indices that may be passed to grpc_tcp_server_port_fd(). */
unsigned port_index;
unsigned fd_index;
};
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
grpc_tcp_server *from_server,
unsigned port_index, unsigned fd_index);
grpc_tcp_server_acceptor *acceptor);
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
@ -70,22 +78,18 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index);
unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, unsigned port_index);
/* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth
(port_index) call to add_port() on this server, or -1 if the indices are out
of bounds. The file descriptor remains owned by the server, and will be
cleaned up when grpc_tcp_server_destroy is called. */
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index);
int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index);
/* Ref s and return s. */
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);
/* Set or reset the shutdown_complete closure. shutdown_complete may be NULL. */
void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete);
/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
a call (exec_ctx!=NULL) to shutdown_complete. */
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);

@ -212,8 +212,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@ -315,6 +314,8 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
grpc_fd *fdobj;
size_t i;
@ -363,7 +364,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
grpc_tcp_server_ref(sp->server), sp->port_index, sp->fd_index);
&acceptor);
gpr_free(name);
gpr_free(addr_str);
@ -524,7 +525,8 @@ done:
}
}
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) {
unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
unsigned port_index) {
unsigned num_fds = 0;
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next) {
@ -537,8 +539,8 @@ unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, unsigned port_index) {
return num_fds;
}
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next) {
if (!sp->is_sibling) {
@ -585,19 +587,14 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
return s;
}
void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete) {
s->shutdown_complete = shutdown_complete;
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
if (exec_ctx == NULL) {
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server_destroy(&local_exec_ctx, s);
tcp_server_destroy(&local_exec_ctx, s);
grpc_exec_ctx_finish(&local_exec_ctx);
} else {
grpc_tcp_server_destroy(exec_ctx, s);
tcp_server_destroy(exec_ctx, s);
}
}
}

@ -55,6 +55,7 @@
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
/* one listening port */
typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
/* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
@ -134,8 +135,7 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
return s;
}
static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
int immediately_done = 0;
grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu);
@ -156,19 +156,14 @@ static void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx,
}
}
void grpc_tcp_server_set_shutdown_complete(grpc_tcp_server *s,
grpc_closure *shutdown_complete) {
s->shutdown_complete = shutdown_complete;
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
if (exec_ctx == NULL) {
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server_destroy(&local_exec_ctx, s);
tcp_server_destroy(&local_exec_ctx, s);
grpc_exec_ctx_finish(&local_exec_ctx);
} else {
grpc_tcp_server_destroy(exec_ctx, s);
tcp_server_destroy(exec_ctx, s);
}
}
}
@ -300,6 +295,7 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@ -367,7 +363,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep,
sp->server, sp->port_index, 0);
&acceptor);
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
@ -495,7 +491,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
}
unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, int port_index) {
unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, int port_index) {
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
;
@ -506,8 +502,8 @@ unsigned grpc_tcp_server_fds_for_port(grpc_tcp_server *s, int port_index) {
}
}
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
grpc_tcp_listener *sp;
if (fd_index != 0) {
/* Windows implementation has only one fd per port_index. */

@ -127,9 +127,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
grpc_tcp_server *from_server, unsigned port_index,
unsigned fd_index) {
grpc_tcp_server_unref(NULL, from_server);
grpc_tcp_server_acceptor *acceptor) {
grpc_server_secure_state *state = statep;
state_ref(state);
grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp,

@ -53,8 +53,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
}
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
grpc_endpoint *tcp, grpc_tcp_server *tcp_server,
unsigned port_index, unsigned fd_index) {
grpc_endpoint *tcp,
grpc_tcp_server_acceptor *acceptor) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@ -66,7 +66,6 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
setup_transport(exec_ctx, server, transport);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
grpc_tcp_server_unref(exec_ctx, tcp_server);
}
/* Server callback: start listening on our ports */
@ -82,8 +81,8 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_set_shutdown_complete(tcp, destroy_done);
grpc_tcp_server_unref(exec_ctx, tcp);
grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {

@ -79,10 +79,8 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server *tcp_server, unsigned port_index,
unsigned fd_index) {
grpc_tcp_server_acceptor *acceptor) {
test_tcp_server *server = arg;
grpc_tcp_server_unref(NULL, tcp_server);
grpc_closure_init(&on_read, handle_read, NULL);
gpr_slice_buffer_init(&state.incoming_buffer);
gpr_slice_buffer_init(&state.temp_incoming_buffer);

@ -53,18 +53,17 @@ struct on_connect_result {
};
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server *tcp_server, unsigned port_index,
unsigned fd_index) {
grpc_tcp_server_acceptor *acceptor) {
struct on_connect_result *result = arg;
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
result->server_fd = grpc_tcp_server_get_fd(tcp_server, port_index, fd_index);
result->server_fd = grpc_tcp_server_port_fd(
acceptor->from_server, acceptor->port_index, acceptor->fd_index);
g_nconnects++;
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_tcp_server_unref(exec_ctx, tcp_server);
}
static void test_no_op(void) {
@ -133,14 +132,14 @@ static void test_connect(int n) {
addr.ss_family = AF_INET;
GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len) >
0);
GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 2) == 0);
GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 1) == 0);
GPR_ASSERT(grpc_tcp_server_fds_for_port(s, 0) == 1);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 1) < 0);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 0, 2) < 0);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 2, 0) < 0);
GPR_ASSERT(grpc_tcp_server_get_fd(s, 1, 0) < 0);
svrfd = grpc_tcp_server_get_fd(s, 0, 0);
GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 2) == 0);
GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 1) == 0);
GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 0) == 1);
GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 1) < 0);
GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 2) < 0);
GPR_ASSERT(grpc_tcp_server_port_fd(s, 2, 0) < 0);
GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 0) < 0);
svrfd = grpc_tcp_server_port_fd(s, 0, 0);
GPR_ASSERT(svrfd >= 0);
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr));

@ -67,14 +67,12 @@ static void pretty_print_backoffs(reconnect_server *server) {
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server *tcp_server, unsigned port_index,
unsigned fd_index) {
grpc_tcp_server_acceptor *acceptor) {
char *peer;
char *last_colon;
reconnect_server *server = (reconnect_server *)arg;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list *new_tail;
grpc_tcp_server_unref(NULL, tcp_server);
peer = grpc_endpoint_get_peer(tcp);
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp);

@ -45,10 +45,17 @@
#include "src/core/iomgr/tcp_server.h"
#include "test/core/util/port.h"
static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data,
int success) {
test_tcp_server *server = data;
server->shutdown = 1;
}
void test_tcp_server_init(test_tcp_server *server,
grpc_tcp_server_cb on_connect, void *user_data) {
grpc_init();
server->tcp_server = NULL;
grpc_closure_init(&server->shutdown_complete, on_server_destroyed, server);
server->shutdown = 0;
grpc_pollset_init(&server->pollset);
server->pollsets[0] = &server->pollset;
@ -65,7 +72,7 @@ void test_tcp_server_start(test_tcp_server *server, int port) {
addr.sin_port = htons((uint16_t)port);
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
server->tcp_server = grpc_tcp_server_create(NULL);
server->tcp_server = grpc_tcp_server_create(&server->shutdown_complete);
port_added =
grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
GPR_ASSERT(port_added == port);
@ -90,24 +97,14 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) {
grpc_exec_ctx_finish(&exec_ctx);
}
static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data,
int success) {
test_tcp_server *server = data;
server->shutdown = 1;
}
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {}
void test_tcp_server_destroy(test_tcp_server *server) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_timespec shutdown_deadline;
grpc_closure server_shutdown_cb;
grpc_closure do_nothing_cb;
grpc_closure_init(&server_shutdown_cb, on_server_destroyed, server);
grpc_closure_init(&do_nothing_cb, do_nothing, NULL);
grpc_tcp_server_set_shutdown_complete(server->tcp_server,
&server_shutdown_cb);
grpc_tcp_server_unref(&exec_ctx, server->tcp_server);
grpc_closure_init(&do_nothing_cb, do_nothing, NULL);
shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(5, GPR_TIMESPAN));
while (!server->shutdown &&

@ -39,6 +39,7 @@
typedef struct test_tcp_server {
grpc_tcp_server *tcp_server;
grpc_closure shutdown_complete;
int shutdown;
grpc_pollset pollset;
grpc_pollset *pollsets[1];

@ -114,10 +114,13 @@ class ValgrindConfig(object):
self.args = args
self.allow_hashing = False
def job_spec(self, cmdline, hash_targets):
def job_spec(self, cmdline, hash_targets, timeout_seconds=None,
shortname=None, environ=None):
if shortname is None:
shortname = 'valgrind %s' % cmdline[0]
return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool] +
self.args + cmdline,
shortname='valgrind %s' % cmdline[0],
shortname=shortname,
hash_targets=None,
flake_retries=5 if args.allow_flakes else 0,
timeout_retries=3 if args.allow_flakes else 0)
@ -1092,4 +1095,3 @@ else:
if BuildAndRunError.POST_TEST in errors:
exit_code |= 4
sys.exit(exit_code)

Loading…
Cancel
Save