diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index 58960b6028a..7c945ebad48 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -68,6 +68,8 @@ static void create_sockets(SOCKET sv[2]) { GPR_ASSERT(svr_sock != INVALID_SOCKET); closesocket(lst_sock); + grpc_tcp_prepare_socket(cli_sock); + grpc_tcp_prepare_socket(svr_sock); sv[1] = cli_sock; sv[0] = svr_sock; diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 1cdf3da0d62..8827bb99bcb 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -53,6 +53,7 @@ static OVERLAPPED g_iocp_custom_overlap; static gpr_event g_shutdown_iocp; static gpr_event g_iocp_done; static gpr_atm g_orphans = 0; +static gpr_atm g_custom_events = 0; static HANDLE g_iocp; @@ -62,20 +63,19 @@ static void do_iocp_work() { DWORD flags = 0; ULONG_PTR completion_key; LPOVERLAPPED overlapped; - gpr_timespec wait_time = gpr_inf_future; grpc_winsocket *socket; grpc_winsocket_callback_info *info; void(*f)(void *, int) = NULL; void *opaque = NULL; success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, - gpr_time_to_millis(wait_time)); - if (!success && !overlapped) { - /* The deadline got attained. */ - return; - } + INFINITE); + /* success = 0 and overlapped = NULL means the deadline got attained. + Which is impossible. since our wait time is +inf */ + GPR_ASSERT(success || overlapped); GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { + gpr_atm_full_fetch_add(&g_custom_events, -1); if (completion_key == (ULONG_PTR) &g_iocp_kick_token) { /* We were awoken from a kick. */ return; @@ -93,13 +93,17 @@ static void do_iocp_work() { gpr_log(GPR_ERROR, "Unknown IOCP operation"); abort(); } - success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, - FALSE, &flags); + GPR_ASSERT(info->outstanding); if (socket->orphan) { - grpc_winsocket_destroy(socket); - gpr_atm_full_fetch_add(&g_orphans, -1); + info->outstanding = 0; + if (!socket->read_info.outstanding && !socket->write_info.outstanding) { + grpc_winsocket_destroy(socket); + gpr_atm_full_fetch_add(&g_orphans, -1); + } return; } + success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, + FALSE, &flags); info->bytes_transfered = bytes; info->wsa_error = success ? 0 : WSAGetLastError(); GPR_ASSERT(overlapped == &info->overlapped); @@ -117,10 +121,14 @@ static void do_iocp_work() { } static void iocp_loop(void *p) { - while (gpr_atm_acq_load(&g_orphans) || !gpr_event_get(&g_shutdown_iocp)) { + void * eventshutdown = NULL; + while (gpr_atm_acq_load(&g_orphans) || + gpr_atm_acq_load(&g_custom_events) || + !gpr_event_get(&g_shutdown_iocp)) { grpc_maybe_call_delayed_callbacks(NULL, 1); do_iocp_work(); } + gpr_log(GPR_DEBUG, "iocp_loop is done"); gpr_event_set(&g_iocp_done, (void *)1); } @@ -128,8 +136,8 @@ static void iocp_loop(void *p) { void grpc_iocp_init(void) { gpr_thd_id id; - g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, - (ULONG_PTR)NULL, 0); + g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + NULL, (ULONG_PTR)NULL, 0); GPR_ASSERT(g_iocp); gpr_event_init(&g_iocp_done); @@ -140,6 +148,7 @@ void grpc_iocp_init(void) { void grpc_iocp_kick(void) { BOOL success; + gpr_atm_full_fetch_add(&g_custom_events, 1); success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR) &g_iocp_kick_token, &g_iocp_custom_overlap); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 3e097a7633f..d95346f87ab 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -74,7 +74,7 @@ static void async_connect_cleanup(async_connect *ac) { static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); - /* If the alarm didn't occor, it got cancelled. */ + /* If the alarm didn't occur, it got cancelled. */ if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } @@ -98,6 +98,7 @@ static void on_connect(void *acp, int from_iocp) { if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; + info->outstanding = 0; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, &transfered_bytes, FALSE, &flags); @@ -194,6 +195,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), socket = grpc_winsocket_create(sock); info = &socket->write_info; + info->outstanding = 1; success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index b37b274e87e..d22acc7453f 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -248,6 +248,7 @@ static void on_accept(void *arg, int from_iocp) { if (sp->shutting_down) { GPR_ASSERT(from_iocp); sp->shutting_down = 0; + sp->socket->read_info.outstanding = 0; gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_cv_broadcast(&sp->server->cv); @@ -419,6 +420,7 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { + s->ports[i].socket->read_info.outstanding = 1; start_accept(s->ports + i); s->active_ports++; } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 2c2df000057..f16b4c12688 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -172,7 +172,6 @@ static void win_notify_on_read(grpc_endpoint *ep, int status; DWORD bytes_read = 0; DWORD flags = 0; - int error; WSABUF buffer; GPR_ASSERT(!tcp->socket->read_info.outstanding); @@ -208,6 +207,15 @@ static void win_notify_on_read(grpc_endpoint *ep, status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, &info->overlapped, NULL); + if (status != 0) { + int wsa_error = WSAGetLastError(); + if (wsa_error != WSA_IO_PENDING) { + info->wsa_error = wsa_error; + on_read(tcp, 1); + return; + } + } + grpc_socket_notify_on_read(tcp->socket, on_read, tcp); } @@ -324,6 +332,16 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, &bytes_sent, 0, &socket->write_info.overlapped, NULL); if (allocated) gpr_free(allocated); + if (status != 0) { + int wsa_error = WSAGetLastError(); + if (wsa_error != WSA_IO_PENDING) { + gpr_slice_buffer_reset_and_unref(&tcp->write_slices); + tcp->socket->write_info.outstanding = 0; + tcp_unref(tcp); + return GRPC_ENDPOINT_WRITE_ERROR; + } + } + /* As all is now setup, we can now ask for the IOCP notification. It may trigger the callback immediately however, but no matter. */ grpc_socket_notify_on_write(socket, on_write, tcp);