diff --git a/include/grpc/support/log_win32.h b/include/grpc/support/log_win32.h index 7abd5df3941..0350056d26e 100644 --- a/include/grpc/support/log_win32.h +++ b/include/grpc/support/log_win32.h @@ -42,7 +42,7 @@ extern "C" { /* Returns a string allocated with gpr_malloc that contains a UTF-8 * formatted error message, corresponding to the error messageid. - * Use in cunjunction with GetLastError() et al. + * Use in conjunction with GetLastError() et al. */ char *gpr_format_message(DWORD messageid); diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 22ca74314cb..bbf8cfc4190 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -48,4 +48,4 @@ void grpc_iomgr_platform_shutdown(void) { grpc_fd_global_shutdown(); } -#endif /* GRPC_IOMGRP_POSIX */ +#endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c index 9cdf224ad77..5c8382e1c01 100644 --- a/src/core/iomgr/iomgr_windows.c +++ b/src/core/iomgr/iomgr_windows.c @@ -64,4 +64,4 @@ void grpc_iomgr_platform_shutdown(void) { winsock_shutdown(); } -#endif /* GRPC_IOMGRP_POSIX */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index fdebede4828..134e6f45e21 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -38,6 +38,8 @@ #include #include +#include +#include #include #include "src/core/iomgr/alarm_internal.h" @@ -53,6 +55,8 @@ static gpr_event g_shutdown_global_poller; static gpr_event g_global_poller_done; void grpc_pollset_init(grpc_pollset *pollset) { + gpr_mu_init(&pollset->mu); + gpr_cv_init(&pollset->cv); pollset->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); GPR_ASSERT(pollset->iocp); @@ -60,6 +64,8 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_destroy(grpc_pollset *pollset) { BOOL status; + gpr_mu_destroy(&pollset->mu); + gpr_cv_destroy(&pollset->cv); status = CloseHandle(pollset->iocp); GPR_ASSERT(status); } @@ -76,10 +82,11 @@ static int pollset_poll(grpc_pollset *pollset, grpc_winsocket_callback_info *info; void(*f)(void *, int) = NULL; void *opaque = NULL; + gpr_mu_unlock(&pollset->mu); success = GetQueuedCompletionStatus(pollset->iocp, &bytes, &completion_key, &overlapped, gpr_time_to_millis(wait_time)); - + gpr_mu_lock(&pollset->mu); if (!success && !overlapped) { /* The deadline got attained. */ return 0; @@ -95,6 +102,8 @@ static int pollset_poll(grpc_pollset *pollset, abort(); } + GPR_ASSERT(pollset == &g_global_pollset); + socket = (grpc_winsocket*) completion_key; if (overlapped == &socket->write_info.overlapped) { gpr_log(GPR_DEBUG, "pollset_poll - got write packet"); @@ -153,7 +162,9 @@ void grpc_pollset_kick(grpc_pollset *pollset) { static void global_poller(void *p) { while (!gpr_event_get(&g_shutdown_global_poller)) { + gpr_mu_lock(&g_global_pollset.mu); grpc_pollset_work(&g_global_pollset, gpr_inf_future); + gpr_mu_unlock(&g_global_pollset.mu); } gpr_event_set(&g_global_poller_done, (void *) 1); @@ -176,9 +187,20 @@ void grpc_pollset_global_shutdown(void) { } void grpc_pollset_add_handle(grpc_pollset *pollset, grpc_winsocket *socket) { - HANDLE ret = CreateIoCompletionPort((HANDLE) socket->socket, pollset->iocp, - (gpr_uintptr) socket, 0); - GPR_ASSERT(ret == pollset->iocp); + HANDLE ret; + if (socket->added_to_iocp) return; + ret = CreateIoCompletionPort((HANDLE)socket->socket, + g_global_pollset.iocp, + (gpr_uintptr) socket, 0); + if (!ret) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message); + gpr_free(utf8_message); + __debugbreak(); + abort(); + } + socket->added_to_iocp = 1; + GPR_ASSERT(ret == g_global_pollset.iocp); } static void handle_notify_on_iocp(grpc_winsocket *socket, diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 036f3910a01..919af5d7b73 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -46,11 +46,13 @@ struct grpc_fd; typedef struct grpc_pollset { + gpr_mu mu; + gpr_cv cv; HANDLE iocp; } grpc_pollset; -#define GRPC_POLLSET_MU(pollset) (NULL) -#define GRPC_POLLSET_CV(pollset) (NULL) +#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) void grpc_pollset_add_handle(grpc_pollset *, grpc_winsocket *); diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h index 350f5fa33e1..08be0e54f8e 100644 --- a/src/core/iomgr/sockaddr_win32.h +++ b/src/core/iomgr/sockaddr_win32.h @@ -38,4 +38,4 @@ #include #include -#endif // __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ */ diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 7d1d59c3187..805e96a0d15 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -56,7 +56,7 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) { void shutdown_op(grpc_winsocket_callback_info *info) { if (!info->cb) return; - info->cb(info->opaque, 0); + grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0); } void grpc_winsocket_shutdown(grpc_winsocket *socket) { @@ -73,4 +73,4 @@ void grpc_winsocket_orphan(grpc_winsocket *socket) { gpr_free(socket); } -#endif +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index ca85ea54cbf..19822ac085c 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -57,6 +57,8 @@ typedef struct grpc_winsocket_callback_info { typedef struct grpc_winsocket { SOCKET socket; + int added_to_iocp; + grpc_winsocket_callback_info write_info; grpc_winsocket_callback_info read_info; @@ -70,4 +72,4 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket); void grpc_winsocket_shutdown(grpc_winsocket *socket); void grpc_winsocket_orphan(grpc_winsocket *socket); -#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */ +#endif /* __GRPC_INTERNAL_IOMGR_HANDLE_WINDOWS_H__ */ diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 0eb6663dd48..967177fdb1c 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -58,9 +58,18 @@ static int set_non_block(SOCKET sock) { return status == 0; } +static int set_dualstack(SOCKET sock) { + int status; + unsigned long param = 0; + status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, ¶m, sizeof(param)); + return status == 0; +} + int grpc_tcp_prepare_socket(SOCKET sock) { if (!set_non_block(sock)) return 0; + if (!set_dualstack(sock)) + return 0; return 1; } @@ -110,8 +119,9 @@ static void on_read(void *tcpp, int success) { GPR_ASSERT(tcp->outstanding_read); if (!success) { - __debugbreak(); - abort(); + tcp_unref(tcp); + cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + return; } gpr_log(GPR_DEBUG, "on_read"); @@ -163,7 +173,6 @@ static void win_notify_on_read(grpc_endpoint *ep, buffer.buf = GPR_SLICE_START_PTR(tcp->read_slice); gpr_log(GPR_DEBUG, "win_notify_on_read: calling WSARecv without overlap"); - status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); @@ -183,7 +192,7 @@ static void win_notify_on_read(grpc_endpoint *ep, &info->overlapped, NULL); if (status == 0) { - gpr_log(GPR_DEBUG, "got response immediately, but we're goint to sleep"); + gpr_log(GPR_DEBUG, "got response immediately, but we're going to sleep"); grpc_handle_notify_on_read(tcp->socket, on_read, tcp); return; } @@ -219,8 +228,9 @@ static void on_write(void *tcpp, int success) { gpr_log(GPR_DEBUG, "on_write"); if (!success) { - __debugbreak(); - abort(); + tcp_unref(tcp); + cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + return; } if (info->wsa_error != 0) { @@ -286,6 +296,10 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, if (status == 0) { ret = GRPC_ENDPOINT_WRITE_DONE; GPR_ASSERT(bytes_sent == tcp->write_slices.length); + } else { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); + gpr_free(utf8_message); } if (allocated) gpr_free(allocated); gpr_slice_buffer_reset_and_unref(&tcp->write_slices); diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index e1cf6fb10a4..4c0a866048f 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -75,8 +76,20 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, /* Simple starter implementation */ void gpr_default_log(gpr_log_func_args *args) { - fprintf(stderr, "%s.%u %s:%d: %s\n", - gpr_log_severity_string(args->severity), GetCurrentThreadId(), + char time_buffer[64]; + gpr_timespec now = gpr_now(); + struct tm tm; + + if (localtime_s(&tm, &now.tv_sec)) { + strcpy(time_buffer, "error:localtime"); + } else if (0 == + strftime(time_buffer, sizeof(time_buffer), "%m%d %H:%M:%S", &tm)) { + strcpy(time_buffer, "error:strftime"); + } + + fprintf(stderr, "%s%s.%09u %5u %s:%d: %s\n", + gpr_log_severity_string(args->severity), time_buffer, + (int)(now.tv_nsec), GetCurrentThreadId(), args->file, args->line, args->message); }