diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 74aad6cea3b..a830a27b0ba 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -33,6 +33,8 @@ #include "src/core/iomgr/exec_ctx.h" +#include + void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 3556b2c301b..c2f62a41b83 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -64,6 +64,7 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { LPOVERLAPPED overlapped; grpc_winsocket *socket; grpc_winsocket_callback_info *info; + grpc_closure *closure = NULL; success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, INFINITE); /* success = 0 and overlapped = NULL means the deadline got attained. @@ -97,12 +98,15 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - grpc_exec_ctx_enqueue(exec_ctx, info->closure, 1); + closure = info->closure; info->closure = NULL; } else { info->has_pending_iocp = 1; } gpr_mu_unlock(&socket->state_mu); + if (closure) { + closure->cb(exec_ctx, closure->cb_arg, 1); + } } static void iocp_loop(void *p) { diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 5e835743af4..6182eb35328 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -115,6 +115,11 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_pollers = 0; } done: + if (!grpc_closure_list_empty(exec_ctx->closure_list)) { + gpr_mu_unlock(&pollset->mu); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + } gpr_cv_destroy(&worker->cv); if (added_worker) { remove_worker(pollset, worker); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 7244606e98c..4b11ab0f064 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -380,8 +380,9 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { - s->port_capacity *= 2; - s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + /* too many ports, and we need to store their address in a closure */ + /* TODO(ctiller): make server_port a linked list */ + abort(); } sp = &s->ports[s->nports++]; sp->server = s; @@ -389,6 +390,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->shutting_down = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; + grpc_closure_init(&sp->on_accept, on_accept, sp); GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 2962762679d..9cae07316a9 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -394,7 +394,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { gpr_mu_init(&tcp->mu); gpr_ref_init(&tcp->refcount, 1); grpc_closure_init(&tcp->on_read, on_read, tcp); - grpc_closure_init(&tcp->on_read, on_write, tcp); + grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; } diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py index 2a863191259..c0b9c02b3e6 100755 --- a/tools/run_tests/jobset.py +++ b/tools/run_tests/jobset.py @@ -174,6 +174,7 @@ class Job(object): for k, v in add_env.iteritems(): env[k] = v self._start = time.time() + message('START', spec.shortname, do_newline=travis) self._process = subprocess.Popen(args=spec.cmdline, stderr=subprocess.STDOUT, stdout=self._tempfile, @@ -185,7 +186,6 @@ class Job(object): self._travis = travis self._xml_test = ET.SubElement(xml_report, 'testcase', name=self._spec.shortname) if xml_report is not None else None - message('START', spec.shortname, do_newline=self._travis) def state(self, update_cache): """Poll current state of the job. Prints messages at completion."""