From 03797e0ec36e76f08ecb51c74ebec0531d24fac4 Mon Sep 17 00:00:00 2001 From: Alex Polcyn Date: Mon, 17 Jun 2019 04:41:46 +0000 Subject: [PATCH] Simplify and fix the c-ares TCP path on Windows --- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 480 +++++++++++++++--- 1 file changed, 401 insertions(+), 79 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index 7d5215938e3..df40ba1c4bd 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -31,6 +31,9 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/iocp_windows.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/sockaddr_windows.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/slice/slice_internal.h" @@ -50,6 +53,32 @@ struct iovec { namespace grpc_core { +/* c-ares reads and takes action on the error codes of the + * "virtual socket operations" in this file, via the WSAGetLastError + * APIs. If code in this file wants to set a specific WSA error that + * c-ares should read, it must do so by calling SetWSAError() on the + * WSAErrorContext instance passed to it. A WSAErrorContext must only be + * instantiated at the top of the virtual socket function callstack. */ +class WSAErrorContext { + public: + explicit WSAErrorContext(){}; + + ~WSAErrorContext() { + if (error_ != 0) { + WSASetLastError(error_); + } + } + + /* Disallow copy and assignment operators */ + WSAErrorContext(const WSAErrorContext&) = delete; + WSAErrorContext& operator=(const WSAErrorContext&) = delete; + + void SetWSAError(int error) { error_ = error; } + + private: + int error_ = 0; +}; + /* c-ares creates its own sockets and is meant to read them when readable and * write them when writeable. To fit this socket usage model into the grpc * windows poller (which gives notifications when attempted reads and writes are @@ -68,11 +97,14 @@ class GrpcPolledFdWindows : public GrpcPolledFd { WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY, }; - GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner) + GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner, + int address_family, int socket_type) : read_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()), - write_state_(WRITE_IDLE), - gotten_into_driver_list_(false) { + tcp_write_state_(WRITE_IDLE), + gotten_into_driver_list_(false), + address_family_(address_family), + socket_type_(socket_type) { gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as); winsocket_ = grpc_winsocket_create(as, name_); combiner_ = GRPC_COMBINER_REF(combiner, name_); @@ -82,6 +114,16 @@ class GrpcPolledFdWindows : public GrpcPolledFd { GRPC_CLOSURE_INIT(&outer_write_closure_, &GrpcPolledFdWindows::OnIocpWriteable, this, grpc_combiner_scheduler(combiner_)); + GRPC_CLOSURE_INIT(&on_tcp_connect_locked_, + &GrpcPolledFdWindows::OnTcpConnectLocked, this, + grpc_combiner_scheduler(combiner_)); + GRPC_CLOSURE_INIT(&continue_register_for_on_readable_locked_, + &GrpcPolledFdWindows::ContinueRegisterForOnReadableLocked, + this, grpc_combiner_scheduler(combiner_)); + GRPC_CLOSURE_INIT( + &continue_register_for_on_writeable_locked_, + &GrpcPolledFdWindows::ContinueRegisterForOnWriteableLocked, this, + grpc_combiner_scheduler(combiner_)); } ~GrpcPolledFdWindows() { @@ -111,6 +153,33 @@ class GrpcPolledFdWindows : public GrpcPolledFd { grpc_slice_unref_internal(read_buf_); GPR_ASSERT(!read_buf_has_data_); read_buf_ = GRPC_SLICE_MALLOC(4192); + if (connect_done_) { + GRPC_CLOSURE_SCHED(&continue_register_for_on_readable_locked_, + GRPC_ERROR_NONE); + } else { + GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == nullptr); + pending_continue_register_for_on_readable_locked_ = + &continue_register_for_on_readable_locked_; + } + } + + static void ContinueRegisterForOnReadableLocked(void* arg, + grpc_error* unused_error) { + GrpcPolledFdWindows* grpc_polled_fd = + static_cast(arg); + grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE); + } + + void InnerContinueRegisterForOnReadableLocked(grpc_error* unused_error) { + GRPC_CARES_TRACE_LOG( + "fd:|%s| InnerContinueRegisterForOnReadableLocked " + "wsa_connect_error_:%d", + GetName(), wsa_connect_error_); + GPR_ASSERT(connect_done_); + if (wsa_connect_error_ != 0) { + ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect")); + return; + } WSABUF buffer; buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_); buffer.len = GRPC_SLICE_LENGTH(read_buf_); @@ -123,13 +192,14 @@ class GrpcPolledFdWindows : public GrpcPolledFd { &winsocket_->read_info.overlapped, nullptr)) { int wsa_last_error = WSAGetLastError(); char* msg = gpr_format_message(wsa_last_error); - grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); GRPC_CARES_TRACE_LOG( - "RegisterForOnReadableLocked: WSARecvFrom error:|%s|. fd:|%s|", msg, - GetName()); + "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| " + "msg:|%s|", + GetName(), wsa_last_error, msg); gpr_free(msg); if (wsa_last_error != WSA_IO_PENDING) { - ScheduleAndNullReadClosure(error); + ScheduleAndNullReadClosure( + GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom")); return; } } @@ -137,23 +207,68 @@ class GrpcPolledFdWindows : public GrpcPolledFd { } void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { - GRPC_CARES_TRACE_LOG( - "RegisterForOnWriteableLocked. fd:|%s|. Current write state: %d", - GetName(), write_state_); + if (socket_type_ == SOCK_DGRAM) { + GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called", + GetName()); + } else { + GPR_ASSERT(socket_type_ == SOCK_STREAM); + GRPC_CARES_TRACE_LOG( + "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d", + GetName(), tcp_write_state_); + } GPR_ASSERT(write_closure_ == nullptr); write_closure_ = write_closure; - switch (write_state_) { - case WRITE_IDLE: - ScheduleAndNullWriteClosure(GRPC_ERROR_NONE); - break; - case WRITE_REQUESTED: - write_state_ = WRITE_PENDING; - SendWriteBuf(nullptr, &winsocket_->write_info.overlapped); - grpc_socket_notify_on_write(winsocket_, &outer_write_closure_); - break; - case WRITE_PENDING: - case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: - abort(); + if (connect_done_) { + GRPC_CLOSURE_SCHED(&continue_register_for_on_writeable_locked_, + GRPC_ERROR_NONE); + } else { + GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == nullptr); + pending_continue_register_for_on_writeable_locked_ = + &continue_register_for_on_writeable_locked_; + } + } + + static void ContinueRegisterForOnWriteableLocked(void* arg, + grpc_error* unused_error) { + GrpcPolledFdWindows* grpc_polled_fd = + static_cast(arg); + grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE); + } + + void InnerContinueRegisterForOnWriteableLocked(grpc_error* unused_error) { + GRPC_CARES_TRACE_LOG( + "fd:|%s| InnerContinueRegisterForOnWriteableLocked " + "wsa_connect_error_:%d", + GetName(), wsa_connect_error_); + GPR_ASSERT(connect_done_); + if (wsa_connect_error_ != 0) { + ScheduleAndNullWriteClosure( + GRPC_WSA_ERROR(wsa_connect_error_, "connect")); + return; + } + if (socket_type_ == SOCK_DGRAM) { + ScheduleAndNullWriteClosure(GRPC_ERROR_NONE); + } else { + GPR_ASSERT(socket_type_ == SOCK_STREAM); + int wsa_error_code = 0; + switch (tcp_write_state_) { + case WRITE_IDLE: + ScheduleAndNullWriteClosure(GRPC_ERROR_NONE); + break; + case WRITE_REQUESTED: + tcp_write_state_ = WRITE_PENDING; + if (SendWriteBuf(nullptr, &winsocket_->write_info.overlapped, + &wsa_error_code) != 0) { + ScheduleAndNullWriteClosure( + GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)")); + } else { + grpc_socket_notify_on_write(winsocket_, &outer_write_closure_); + } + break; + case WRITE_PENDING: + case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: + abort(); + } } } @@ -171,13 +286,15 @@ class GrpcPolledFdWindows : public GrpcPolledFd { const char* GetName() override { return name_; } - ares_ssize_t RecvFrom(void* data, ares_socket_t data_len, int flags, + ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, + ares_socket_t data_len, int flags, struct sockaddr* from, ares_socklen_t* from_len) { GRPC_CARES_TRACE_LOG( - "RecvFrom called on fd:|%s|. Current read buf length:|%d|", GetName(), - GRPC_SLICE_LENGTH(read_buf_)); + "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf " + "length:|%d|", + GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_)); if (!read_buf_has_data_) { - WSASetLastError(WSAEWOULDBLOCK); + wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); return -1; } ares_ssize_t bytes_read = 0; @@ -215,54 +332,99 @@ class GrpcPolledFdWindows : public GrpcPolledFd { return out; } - int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped) { + int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped, + int* wsa_error_code) { WSABUF buf; buf.len = GRPC_SLICE_LENGTH(write_buf_); buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_); DWORD flags = 0; int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1, bytes_sent_ptr, flags, overlapped, nullptr); + *wsa_error_code = WSAGetLastError(); GRPC_CARES_TRACE_LOG( - "WSASend: name:%s. buf len:%d. bytes sent: %d. overlapped %p. return " - "val: %d", - GetName(), buf.len, *bytes_sent_ptr, overlapped, out); + "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d " + "overlapped:%p " + "return:%d *wsa_error_code:%d", + GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0, + overlapped, out, *wsa_error_code); return out; } - ares_ssize_t TrySendWriteBufSyncNonBlocking() { - GPR_ASSERT(write_state_ == WRITE_IDLE); + ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov, + int iov_count) { + GRPC_CARES_TRACE_LOG( + "fd:|%s| SendV called connect_done_:%d was_connect_error_:%d", + GetName(), connect_done_, wsa_connect_error_); + if (!connect_done_) { + wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); + return -1; + } + if (wsa_connect_error_ != 0) { + wsa_error_ctx->SetWSAError(wsa_connect_error_); + return -1; + } + switch (socket_type_) { + case SOCK_DGRAM: + return SendVUDP(wsa_error_ctx, iov, iov_count); + case SOCK_STREAM: + return SendVTCP(wsa_error_ctx, iov, iov_count); + default: + abort(); + } + } + + ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov, + int iov_count) { + // c-ares doesn't handle retryable errors on writes of UDP sockets. + // Therefore, the sendv handler for UDP sockets must only attempt + // to write everything inline. + GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName()); + GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0); + grpc_slice_unref_internal(write_buf_); + write_buf_ = FlattenIovec(iov, iov_count); DWORD bytes_sent = 0; - if (SendWriteBuf(&bytes_sent, nullptr) != 0) { - int wsa_last_error = WSAGetLastError(); - char* msg = gpr_format_message(wsa_last_error); + int wsa_error_code = 0; + if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) { + wsa_error_ctx->SetWSAError(wsa_error_code); + char* msg = gpr_format_message(wsa_error_code); GRPC_CARES_TRACE_LOG( - "TrySendWriteBufSyncNonBlocking: SendWriteBuf error:|%s|. fd:|%s|", - msg, GetName()); + "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(), + wsa_error_code, msg); gpr_free(msg); - if (wsa_last_error == WSA_IO_PENDING) { - WSASetLastError(WSAEWOULDBLOCK); - write_state_ = WRITE_REQUESTED; - } + return -1; } write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent, GRPC_SLICE_LENGTH(write_buf_)); return bytes_sent; } - ares_ssize_t SendV(const struct iovec* iov, int iov_count) { - GRPC_CARES_TRACE_LOG("SendV called on fd:|%s|. Current write state: %d", - GetName(), write_state_); - switch (write_state_) { + ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov, + int iov_count) { + // The "sendv" handler on TCP sockets buffers up write + // requests and returns an artifical WSAEWOULDBLOCK. Writing that buffer out + // in the background, and making further send progress in general, will + // happen as long as c-ares continues to show interest in writeability on + // this fd. + GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d", + GetName(), tcp_write_state_); + switch (tcp_write_state_) { case WRITE_IDLE: + tcp_write_state_ = WRITE_REQUESTED; GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0); grpc_slice_unref_internal(write_buf_); write_buf_ = FlattenIovec(iov, iov_count); - return TrySendWriteBufSyncNonBlocking(); + wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); + return -1; case WRITE_REQUESTED: case WRITE_PENDING: - WSASetLastError(WSAEWOULDBLOCK); + wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); return -1; case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: + // c-ares is retrying a send on data that we previously returned + // WSAEWOULDBLOCK for, but then subsequently wrote out in the + // background. Right now, we assume that c-ares is retrying the same + // send again. If c-ares still needs to send even more data, we'll get + // to it eventually. grpc_slice currently_attempted = FlattenIovec(iov, iov_count); GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >= GRPC_SLICE_LENGTH(write_buf_)); @@ -272,31 +434,159 @@ class GrpcPolledFdWindows : public GrpcPolledFd { GRPC_SLICE_START_PTR(write_buf_)[i]); total_sent++; } - grpc_slice_unref_internal(write_buf_); - write_buf_ = - grpc_slice_sub_no_ref(currently_attempted, total_sent, - GRPC_SLICE_LENGTH(currently_attempted)); - write_state_ = WRITE_IDLE; - total_sent += TrySendWriteBufSyncNonBlocking(); + grpc_slice_unref_internal(currently_attempted); + tcp_write_state_ = WRITE_IDLE; return total_sent; } abort(); } - int Connect(const struct sockaddr* target, ares_socklen_t target_len) { + static void OnTcpConnectLocked(void* arg, grpc_error* error) { + GrpcPolledFdWindows* grpc_polled_fd = + static_cast(arg); + grpc_polled_fd->InnerOnTcpConnectLocked(error); + } + + void InnerOnTcpConnectLocked(grpc_error* error) { + GRPC_CARES_TRACE_LOG( + "fd:%s InnerOnTcpConnectLocked error:|%s| " + "pending_register_for_readable:%" PRIdPTR + " pending_register_for_writeable:%" PRIdPTR, + GetName(), grpc_error_string(error), + pending_continue_register_for_on_readable_locked_, + pending_continue_register_for_on_writeable_locked_); + GPR_ASSERT(!connect_done_); + connect_done_ = true; + GPR_ASSERT(wsa_connect_error_ == 0); + if (error == GRPC_ERROR_NONE) { + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = WSAGetOverlappedResult( + grpc_winsocket_wrapped_socket(winsocket_), + &winsocket_->write_info.overlapped, &transfered_bytes, FALSE, &flags); + GPR_ASSERT(transfered_bytes == 0); + if (!wsa_success) { + wsa_connect_error_ = WSAGetLastError(); + char* msg = gpr_format_message(wsa_connect_error_); + GRPC_CARES_TRACE_LOG( + "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d " + "msg:|%s|", + GetName(), wsa_connect_error_, msg); + gpr_free(msg); + } + } else { + // Spoof up an error code that will cause any future c-ares operations on + // this fd to abort. + wsa_connect_error_ = WSA_OPERATION_ABORTED; + } + if (pending_continue_register_for_on_readable_locked_ != nullptr) { + GRPC_CLOSURE_SCHED(pending_continue_register_for_on_readable_locked_, + GRPC_ERROR_NONE); + } + if (pending_continue_register_for_on_writeable_locked_ != nullptr) { + GRPC_CLOSURE_SCHED(pending_continue_register_for_on_writeable_locked_, + GRPC_ERROR_NONE); + } + } + + int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, + ares_socklen_t target_len) { + switch (socket_type_) { + case SOCK_DGRAM: + return ConnectUDP(wsa_error_ctx, target, target_len); + case SOCK_STREAM: + return ConnectTCP(wsa_error_ctx, target, target_len); + default: + abort(); + } + } + + int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, + ares_socklen_t target_len) { + GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName()); + GPR_ASSERT(!connect_done_); + GPR_ASSERT(wsa_connect_error_ == 0); SOCKET s = grpc_winsocket_wrapped_socket(winsocket_); - GRPC_CARES_TRACE_LOG("Connect: fd:|%s|", GetName()); int out = WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr); - if (out != 0) { + wsa_connect_error_ = WSAGetLastError(); + wsa_error_ctx->SetWSAError(wsa_connect_error_); + connect_done_ = true; + char* msg = gpr_format_message(wsa_connect_error_); + GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(), + wsa_connect_error_, msg); + gpr_free(msg); + // c-ares expects a posix-style connect API + return out == 0 ? 0 : -1; + } + + int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, + ares_socklen_t target_len) { + GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName()); + LPFN_CONNECTEX ConnectEx; + GUID guid = WSAID_CONNECTEX; + DWORD ioctl_num_bytes; + SOCKET s = grpc_winsocket_wrapped_socket(winsocket_); + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr, + nullptr) != 0) { + int wsa_last_error = WSAGetLastError(); + wsa_error_ctx->SetWSAError(wsa_last_error); + char* msg = gpr_format_message(wsa_last_error); + GRPC_CARES_TRACE_LOG( + "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d " + "msg:|%s|", + GetName(), wsa_last_error, msg); + gpr_free(msg); + connect_done_ = true; + wsa_connect_error_ = wsa_last_error; + return -1; + } + grpc_resolved_address wildcard4_addr; + grpc_resolved_address wildcard6_addr; + grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr); + grpc_resolved_address* local_address = nullptr; + if (address_family_ == AF_INET) { + local_address = &wildcard4_addr; + } else { + local_address = &wildcard6_addr; + } + if (bind(s, (struct sockaddr*)local_address->addr, + (int)local_address->len) != 0) { int wsa_last_error = WSAGetLastError(); + wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG("Connect error code:|%d|, msg:|%s|. fd:|%s|", - wsa_last_error, msg, GetName()); + GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(), + wsa_last_error, msg); gpr_free(msg); - // c-ares expects a posix-style connect API + connect_done_ = true; + wsa_connect_error_ = wsa_last_error; + return -1; + } + int out = 0; + if (ConnectEx(s, target, target_len, nullptr, 0, nullptr, + &winsocket_->write_info.overlapped) == 0) { out = -1; + int wsa_last_error = WSAGetLastError(); + wsa_error_ctx->SetWSAError(wsa_last_error); + char* msg = gpr_format_message(wsa_last_error); + GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(), + wsa_last_error, msg); + gpr_free(msg); + if (wsa_last_error == WSA_IO_PENDING) { + // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on + // connect, but an async connect on IOCP socket will give + // WSA_IO_PENDING, so we need to convert. + wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); + } else { + // By returning a non-retryable error to c-ares at this point, + // we're aborting the possibility of any future operations on this fd. + connect_done_ = true; + wsa_connect_error_ = wsa_last_error; + return -1; + } } + grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_); return out; } @@ -319,12 +609,13 @@ class GrpcPolledFdWindows : public GrpcPolledFd { * in subsequent c-ares reads. */ if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) { GRPC_ERROR_UNREF(error); - char* msg = gpr_format_message(winsocket_->read_info.wsa_error); + error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error, + "OnIocpReadableInner"); GRPC_CARES_TRACE_LOG( - "OnIocpReadableInner. winsocket error:|%s|. fd:|%s|", msg, - GetName()); - error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); + "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error " + "code:|%d| msg:|%s|", + GetName(), winsocket_->read_info.wsa_error, + grpc_error_string(error)); } } } @@ -337,8 +628,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd { read_buf_ = grpc_empty_slice(); } GRPC_CARES_TRACE_LOG( - "OnIocpReadable finishing. read buf length now:|%d|. :fd:|%s|", - GRPC_SLICE_LENGTH(read_buf_), GetName()); + "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(), + GRPC_SLICE_LENGTH(read_buf_)); ScheduleAndNullReadClosure(error); } @@ -349,22 +640,26 @@ class GrpcPolledFdWindows : public GrpcPolledFd { void OnIocpWriteableInner(grpc_error* error) { GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName()); + GPR_ASSERT(socket_type_ == SOCK_STREAM); if (error == GRPC_ERROR_NONE) { if (winsocket_->write_info.wsa_error != 0) { - char* msg = gpr_format_message(winsocket_->write_info.wsa_error); - GRPC_CARES_TRACE_LOG( - "OnIocpWriteableInner. winsocket error:|%s|. fd:|%s|", msg, - GetName()); GRPC_ERROR_UNREF(error); - error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); + error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error, + "OnIocpWriteableInner"); + GRPC_CARES_TRACE_LOG( + "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error " + "code:|%d| msg:|%s|", + GetName(), winsocket_->write_info.wsa_error, + grpc_error_string(error)); } } - GPR_ASSERT(write_state_ == WRITE_PENDING); + GPR_ASSERT(tcp_write_state_ == WRITE_PENDING); if (error == GRPC_ERROR_NONE) { - write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY; + tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY; write_buf_ = grpc_slice_sub_no_ref( write_buf_, 0, winsocket_->write_info.bytes_transfered); + GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d", + GetName(), winsocket_->write_info.bytes_transfered); } else { grpc_slice_unref_internal(write_buf_); write_buf_ = grpc_empty_slice(); @@ -386,9 +681,22 @@ class GrpcPolledFdWindows : public GrpcPolledFd { grpc_closure outer_read_closure_; grpc_closure outer_write_closure_; grpc_winsocket* winsocket_; - WriteState write_state_; + // tcp_write_state_ is only used on TCP GrpcPolledFds + WriteState tcp_write_state_; char* name_ = nullptr; bool gotten_into_driver_list_; + int address_family_; + int socket_type_; + grpc_closure on_tcp_connect_locked_; + bool connect_done_ = false; + int wsa_connect_error_ = 0; + // We don't run register_for_{readable,writeable} logic until + // a socket is connected. In the interim, we queue readable/writeable + // registrations with the following state. + grpc_closure continue_register_for_on_readable_locked_; + grpc_closure continue_register_for_on_writeable_locked_; + grpc_closure* pending_continue_register_for_on_readable_locked_ = nullptr; + grpc_closure* pending_continue_register_for_on_writeable_locked_ = nullptr; }; struct SockToPolledFdEntry { @@ -454,39 +762,53 @@ class SockToPolledFdMap { * objects. */ static ares_socket_t Socket(int af, int type, int protocol, void* user_data) { + if (type != SOCK_DGRAM && type != SOCK_STREAM) { + GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type); + return INVALID_SOCKET; + } SockToPolledFdMap* map = static_cast(user_data); SOCKET s = WSASocket(af, type, protocol, nullptr, 0, grpc_get_default_wsa_socket_flags()); if (s == INVALID_SOCKET) { + GRPC_CARES_TRACE_LOG( + "WSASocket failed with params af:%d type:%d protocol:%d", af, type, + protocol); return s; } grpc_tcp_set_non_block(s); GrpcPolledFdWindows* polled_fd = - New(s, map->combiner_); + New(s, map->combiner_, af, type); + GRPC_CARES_TRACE_LOG( + "fd:|%s| created with params af:%d type:%d protocol:%d", + polled_fd->GetName(), af, type, protocol); map->AddNewSocket(s, polled_fd); return s; } static int Connect(ares_socket_t as, const struct sockaddr* target, ares_socklen_t target_len, void* user_data) { + WSAErrorContext wsa_error_ctx; SockToPolledFdMap* map = static_cast(user_data); GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as); - return polled_fd->Connect(target, target_len); + return polled_fd->Connect(&wsa_error_ctx, target, target_len); } static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov, int iovec_count, void* user_data) { + WSAErrorContext wsa_error_ctx; SockToPolledFdMap* map = static_cast(user_data); GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as); - return polled_fd->SendV(iov, iovec_count); + return polled_fd->SendV(&wsa_error_ctx, iov, iovec_count); } static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len, int flags, struct sockaddr* from, ares_socklen_t* from_len, void* user_data) { + WSAErrorContext wsa_error_ctx; SockToPolledFdMap* map = static_cast(user_data); GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as); - return polled_fd->RecvFrom(data, data_len, flags, from, from_len); + return polled_fd->RecvFrom(&wsa_error_ctx, data, data_len, flags, from, + from_len); } static int CloseSocket(SOCKET s, void* user_data) {