Simplify and fix the c-ares TCP path on Windows

pull/19397/head
Alex Polcyn 6 years ago committed by Alexander Polcyn
parent 6c46217dce
commit 03797e0ec3
  1. 460
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc

@ -31,6 +31,9 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/iomgr/tcp_windows.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
@ -50,6 +53,32 @@ struct iovec {
namespace grpc_core { namespace grpc_core {
/* c-ares reads and takes action on the error codes of the
* "virtual socket operations" in this file, via the WSAGetLastError
* APIs. If code in this file wants to set a specific WSA error that
* c-ares should read, it must do so by calling SetWSAError() on the
* WSAErrorContext instance passed to it. A WSAErrorContext must only be
* instantiated at the top of the virtual socket function callstack. */
class WSAErrorContext {
public:
explicit WSAErrorContext(){};
~WSAErrorContext() {
if (error_ != 0) {
WSASetLastError(error_);
}
}
/* Disallow copy and assignment operators */
WSAErrorContext(const WSAErrorContext&) = delete;
WSAErrorContext& operator=(const WSAErrorContext&) = delete;
void SetWSAError(int error) { error_ = error; }
private:
int error_ = 0;
};
/* c-ares creates its own sockets and is meant to read them when readable and /* c-ares creates its own sockets and is meant to read them when readable and
* write them when writeable. To fit this socket usage model into the grpc * write them when writeable. To fit this socket usage model into the grpc
* windows poller (which gives notifications when attempted reads and writes are * windows poller (which gives notifications when attempted reads and writes are
@ -68,11 +97,14 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY, WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
}; };
GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner) GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner,
int address_family, int socket_type)
: read_buf_(grpc_empty_slice()), : read_buf_(grpc_empty_slice()),
write_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()),
write_state_(WRITE_IDLE), tcp_write_state_(WRITE_IDLE),
gotten_into_driver_list_(false) { gotten_into_driver_list_(false),
address_family_(address_family),
socket_type_(socket_type) {
gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as); gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
winsocket_ = grpc_winsocket_create(as, name_); winsocket_ = grpc_winsocket_create(as, name_);
combiner_ = GRPC_COMBINER_REF(combiner, name_); combiner_ = GRPC_COMBINER_REF(combiner, name_);
@ -82,6 +114,16 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
GRPC_CLOSURE_INIT(&outer_write_closure_, GRPC_CLOSURE_INIT(&outer_write_closure_,
&GrpcPolledFdWindows::OnIocpWriteable, this, &GrpcPolledFdWindows::OnIocpWriteable, this,
grpc_combiner_scheduler(combiner_)); grpc_combiner_scheduler(combiner_));
GRPC_CLOSURE_INIT(&on_tcp_connect_locked_,
&GrpcPolledFdWindows::OnTcpConnectLocked, this,
grpc_combiner_scheduler(combiner_));
GRPC_CLOSURE_INIT(&continue_register_for_on_readable_locked_,
&GrpcPolledFdWindows::ContinueRegisterForOnReadableLocked,
this, grpc_combiner_scheduler(combiner_));
GRPC_CLOSURE_INIT(
&continue_register_for_on_writeable_locked_,
&GrpcPolledFdWindows::ContinueRegisterForOnWriteableLocked, this,
grpc_combiner_scheduler(combiner_));
} }
~GrpcPolledFdWindows() { ~GrpcPolledFdWindows() {
@ -111,6 +153,33 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
grpc_slice_unref_internal(read_buf_); grpc_slice_unref_internal(read_buf_);
GPR_ASSERT(!read_buf_has_data_); GPR_ASSERT(!read_buf_has_data_);
read_buf_ = GRPC_SLICE_MALLOC(4192); read_buf_ = GRPC_SLICE_MALLOC(4192);
if (connect_done_) {
GRPC_CLOSURE_SCHED(&continue_register_for_on_readable_locked_,
GRPC_ERROR_NONE);
} else {
GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == nullptr);
pending_continue_register_for_on_readable_locked_ =
&continue_register_for_on_readable_locked_;
}
}
static void ContinueRegisterForOnReadableLocked(void* arg,
grpc_error* unused_error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE);
}
void InnerContinueRegisterForOnReadableLocked(grpc_error* unused_error) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| InnerContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GPR_ASSERT(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
return;
}
WSABUF buffer; WSABUF buffer;
buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_); buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
buffer.len = GRPC_SLICE_LENGTH(read_buf_); buffer.len = GRPC_SLICE_LENGTH(read_buf_);
@ -123,13 +192,14 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
&winsocket_->read_info.overlapped, nullptr)) { &winsocket_->read_info.overlapped, nullptr)) {
int wsa_last_error = WSAGetLastError(); int wsa_last_error = WSAGetLastError();
char* msg = gpr_format_message(wsa_last_error); char* msg = gpr_format_message(wsa_last_error);
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"RegisterForOnReadableLocked: WSARecvFrom error:|%s|. fd:|%s|", msg, "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
GetName()); "msg:|%s|",
GetName(), wsa_last_error, msg);
gpr_free(msg); gpr_free(msg);
if (wsa_last_error != WSA_IO_PENDING) { if (wsa_last_error != WSA_IO_PENDING) {
ScheduleAndNullReadClosure(error); ScheduleAndNullReadClosure(
GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom"));
return; return;
} }
} }
@ -137,25 +207,70 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
} }
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
if (socket_type_ == SOCK_DGRAM) {
GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called",
GetName());
} else {
GPR_ASSERT(socket_type_ == SOCK_STREAM);
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"RegisterForOnWriteableLocked. fd:|%s|. Current write state: %d", "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d",
GetName(), write_state_); GetName(), tcp_write_state_);
}
GPR_ASSERT(write_closure_ == nullptr); GPR_ASSERT(write_closure_ == nullptr);
write_closure_ = write_closure; write_closure_ = write_closure;
switch (write_state_) { if (connect_done_) {
GRPC_CLOSURE_SCHED(&continue_register_for_on_writeable_locked_,
GRPC_ERROR_NONE);
} else {
GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == nullptr);
pending_continue_register_for_on_writeable_locked_ =
&continue_register_for_on_writeable_locked_;
}
}
static void ContinueRegisterForOnWriteableLocked(void* arg,
grpc_error* unused_error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE);
}
void InnerContinueRegisterForOnWriteableLocked(grpc_error* unused_error) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| InnerContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GPR_ASSERT(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullWriteClosure(
GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
return;
}
if (socket_type_ == SOCK_DGRAM) {
ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
} else {
GPR_ASSERT(socket_type_ == SOCK_STREAM);
int wsa_error_code = 0;
switch (tcp_write_state_) {
case WRITE_IDLE: case WRITE_IDLE:
ScheduleAndNullWriteClosure(GRPC_ERROR_NONE); ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
break; break;
case WRITE_REQUESTED: case WRITE_REQUESTED:
write_state_ = WRITE_PENDING; tcp_write_state_ = WRITE_PENDING;
SendWriteBuf(nullptr, &winsocket_->write_info.overlapped); if (SendWriteBuf(nullptr, &winsocket_->write_info.overlapped,
&wsa_error_code) != 0) {
ScheduleAndNullWriteClosure(
GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
} else {
grpc_socket_notify_on_write(winsocket_, &outer_write_closure_); grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
}
break; break;
case WRITE_PENDING: case WRITE_PENDING:
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
abort(); abort();
} }
} }
}
bool IsFdStillReadableLocked() override { bool IsFdStillReadableLocked() override {
return GRPC_SLICE_LENGTH(read_buf_) > 0; return GRPC_SLICE_LENGTH(read_buf_) > 0;
@ -171,13 +286,15 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
const char* GetName() override { return name_; } const char* GetName() override { return name_; }
ares_ssize_t RecvFrom(void* data, ares_socket_t data_len, int flags, ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int flags,
struct sockaddr* from, ares_socklen_t* from_len) { struct sockaddr* from, ares_socklen_t* from_len) {
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"RecvFrom called on fd:|%s|. Current read buf length:|%d|", GetName(), "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
GRPC_SLICE_LENGTH(read_buf_)); "length:|%d|",
GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
if (!read_buf_has_data_) { if (!read_buf_has_data_) {
WSASetLastError(WSAEWOULDBLOCK); wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1; return -1;
} }
ares_ssize_t bytes_read = 0; ares_ssize_t bytes_read = 0;
@ -215,54 +332,99 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
return out; return out;
} }
int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped) { int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
int* wsa_error_code) {
WSABUF buf; WSABUF buf;
buf.len = GRPC_SLICE_LENGTH(write_buf_); buf.len = GRPC_SLICE_LENGTH(write_buf_);
buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_); buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
DWORD flags = 0; DWORD flags = 0;
int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1, int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
bytes_sent_ptr, flags, overlapped, nullptr); bytes_sent_ptr, flags, overlapped, nullptr);
*wsa_error_code = WSAGetLastError();
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"WSASend: name:%s. buf len:%d. bytes sent: %d. overlapped %p. return " "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
"val: %d", "overlapped:%p "
GetName(), buf.len, *bytes_sent_ptr, overlapped, out); "return:%d *wsa_error_code:%d",
GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
overlapped, out, *wsa_error_code);
return out; return out;
} }
ares_ssize_t TrySendWriteBufSyncNonBlocking() { ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
GPR_ASSERT(write_state_ == WRITE_IDLE); int iov_count) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendV called connect_done_:%d was_connect_error_:%d",
GetName(), connect_done_, wsa_connect_error_);
if (!connect_done_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
}
if (wsa_connect_error_ != 0) {
wsa_error_ctx->SetWSAError(wsa_connect_error_);
return -1;
}
switch (socket_type_) {
case SOCK_DGRAM:
return SendVUDP(wsa_error_ctx, iov, iov_count);
case SOCK_STREAM:
return SendVTCP(wsa_error_ctx, iov, iov_count);
default:
abort();
}
}
ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
// c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
grpc_slice_unref_internal(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count);
DWORD bytes_sent = 0; DWORD bytes_sent = 0;
if (SendWriteBuf(&bytes_sent, nullptr) != 0) { int wsa_error_code = 0;
int wsa_last_error = WSAGetLastError(); if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) {
char* msg = gpr_format_message(wsa_last_error); wsa_error_ctx->SetWSAError(wsa_error_code);
char* msg = gpr_format_message(wsa_error_code);
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"TrySendWriteBufSyncNonBlocking: SendWriteBuf error:|%s|. fd:|%s|", "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
msg, GetName()); wsa_error_code, msg);
gpr_free(msg); gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) { return -1;
WSASetLastError(WSAEWOULDBLOCK);
write_state_ = WRITE_REQUESTED;
}
} }
write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent, write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
GRPC_SLICE_LENGTH(write_buf_)); GRPC_SLICE_LENGTH(write_buf_));
return bytes_sent; return bytes_sent;
} }
ares_ssize_t SendV(const struct iovec* iov, int iov_count) { ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
GRPC_CARES_TRACE_LOG("SendV called on fd:|%s|. Current write state: %d", int iov_count) {
GetName(), write_state_); // The "sendv" handler on TCP sockets buffers up write
switch (write_state_) { // requests and returns an artifical WSAEWOULDBLOCK. Writing that buffer out
// in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on
// this fd.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
GetName(), tcp_write_state_);
switch (tcp_write_state_) {
case WRITE_IDLE: case WRITE_IDLE:
tcp_write_state_ = WRITE_REQUESTED;
GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0); GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
grpc_slice_unref_internal(write_buf_); grpc_slice_unref_internal(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count); write_buf_ = FlattenIovec(iov, iov_count);
return TrySendWriteBufSyncNonBlocking(); wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
case WRITE_REQUESTED: case WRITE_REQUESTED:
case WRITE_PENDING: case WRITE_PENDING:
WSASetLastError(WSAEWOULDBLOCK); wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1; return -1;
case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
// c-ares is retrying a send on data that we previously returned
// WSAEWOULDBLOCK for, but then subsequently wrote out in the
// background. Right now, we assume that c-ares is retrying the same
// send again. If c-ares still needs to send even more data, we'll get
// to it eventually.
grpc_slice currently_attempted = FlattenIovec(iov, iov_count); grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >= GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
GRPC_SLICE_LENGTH(write_buf_)); GRPC_SLICE_LENGTH(write_buf_));
@ -272,31 +434,159 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
GRPC_SLICE_START_PTR(write_buf_)[i]); GRPC_SLICE_START_PTR(write_buf_)[i]);
total_sent++; total_sent++;
} }
grpc_slice_unref_internal(write_buf_); grpc_slice_unref_internal(currently_attempted);
write_buf_ = tcp_write_state_ = WRITE_IDLE;
grpc_slice_sub_no_ref(currently_attempted, total_sent,
GRPC_SLICE_LENGTH(currently_attempted));
write_state_ = WRITE_IDLE;
total_sent += TrySendWriteBufSyncNonBlocking();
return total_sent; return total_sent;
} }
abort(); abort();
} }
int Connect(const struct sockaddr* target, ares_socklen_t target_len) { static void OnTcpConnectLocked(void* arg, grpc_error* error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
grpc_polled_fd->InnerOnTcpConnectLocked(error);
}
void InnerOnTcpConnectLocked(grpc_error* error) {
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked error:|%s| "
"pending_register_for_readable:%" PRIdPTR
" pending_register_for_writeable:%" PRIdPTR,
GetName(), grpc_error_string(error),
pending_continue_register_for_on_readable_locked_,
pending_continue_register_for_on_writeable_locked_);
GPR_ASSERT(!connect_done_);
connect_done_ = true;
GPR_ASSERT(wsa_connect_error_ == 0);
if (error == GRPC_ERROR_NONE) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(
grpc_winsocket_wrapped_socket(winsocket_),
&winsocket_->write_info.overlapped, &transfered_bytes, FALSE, &flags);
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
"msg:|%s|",
GetName(), wsa_connect_error_, msg);
gpr_free(msg);
}
} else {
// Spoof up an error code that will cause any future c-ares operations on
// this fd to abort.
wsa_connect_error_ = WSA_OPERATION_ABORTED;
}
if (pending_continue_register_for_on_readable_locked_ != nullptr) {
GRPC_CLOSURE_SCHED(pending_continue_register_for_on_readable_locked_,
GRPC_ERROR_NONE);
}
if (pending_continue_register_for_on_writeable_locked_ != nullptr) {
GRPC_CLOSURE_SCHED(pending_continue_register_for_on_writeable_locked_,
GRPC_ERROR_NONE);
}
}
int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
switch (socket_type_) {
case SOCK_DGRAM:
return ConnectUDP(wsa_error_ctx, target, target_len);
case SOCK_STREAM:
return ConnectTCP(wsa_error_ctx, target, target_len);
default:
abort();
}
}
int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName());
GPR_ASSERT(!connect_done_);
GPR_ASSERT(wsa_connect_error_ == 0);
SOCKET s = grpc_winsocket_wrapped_socket(winsocket_); SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
GRPC_CARES_TRACE_LOG("Connect: fd:|%s|", GetName());
int out = int out =
WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr); WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
if (out != 0) { wsa_connect_error_ = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_connect_error_);
connect_done_ = true;
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(),
wsa_connect_error_, msg);
gpr_free(msg);
// c-ares expects a posix-style connect API
return out == 0 ? 0 : -1;
}
int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName());
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr,
nullptr) != 0) {
int wsa_last_error = WSAGetLastError(); int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error); char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("Connect error code:|%d|, msg:|%s|. fd:|%s|", GRPC_CARES_TRACE_LOG(
wsa_last_error, msg, GetName()); "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
"msg:|%s|",
GetName(), wsa_last_error, msg);
gpr_free(msg); gpr_free(msg);
// c-ares expects a posix-style connect API connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
return -1;
}
grpc_resolved_address wildcard4_addr;
grpc_resolved_address wildcard6_addr;
grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr);
grpc_resolved_address* local_address = nullptr;
if (address_family_ == AF_INET) {
local_address = &wildcard4_addr;
} else {
local_address = &wildcard6_addr;
}
if (bind(s, (struct sockaddr*)local_address->addr,
(int)local_address->len) != 0) {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(),
wsa_last_error, msg);
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
return -1;
}
int out = 0;
if (ConnectEx(s, target, target_len, nullptr, 0, nullptr,
&winsocket_->write_info.overlapped) == 0) {
out = -1; out = -1;
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(),
wsa_last_error, msg);
gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
// connect, but an async connect on IOCP socket will give
// WSA_IO_PENDING, so we need to convert.
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
} else {
// By returning a non-retryable error to c-ares at this point,
// we're aborting the possibility of any future operations on this fd.
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
return -1;
} }
}
grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_);
return out; return out;
} }
@ -319,12 +609,13 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
* in subsequent c-ares reads. */ * in subsequent c-ares reads. */
if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) { if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
char* msg = gpr_format_message(winsocket_->read_info.wsa_error); error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
"OnIocpReadableInner");
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"OnIocpReadableInner. winsocket error:|%s|. fd:|%s|", msg, "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
GetName()); "code:|%d| msg:|%s|",
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); GetName(), winsocket_->read_info.wsa_error,
gpr_free(msg); grpc_error_string(error));
} }
} }
} }
@ -337,8 +628,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
read_buf_ = grpc_empty_slice(); read_buf_ = grpc_empty_slice();
} }
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"OnIocpReadable finishing. read buf length now:|%d|. :fd:|%s|", "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
GRPC_SLICE_LENGTH(read_buf_), GetName()); GRPC_SLICE_LENGTH(read_buf_));
ScheduleAndNullReadClosure(error); ScheduleAndNullReadClosure(error);
} }
@ -349,22 +640,26 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
void OnIocpWriteableInner(grpc_error* error) { void OnIocpWriteableInner(grpc_error* error) {
GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName()); GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
GPR_ASSERT(socket_type_ == SOCK_STREAM);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
if (winsocket_->write_info.wsa_error != 0) { if (winsocket_->write_info.wsa_error != 0) {
char* msg = gpr_format_message(winsocket_->write_info.wsa_error);
GRPC_CARES_TRACE_LOG(
"OnIocpWriteableInner. winsocket error:|%s|. fd:|%s|", msg,
GetName());
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
gpr_free(msg); "OnIocpWriteableInner");
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->write_info.wsa_error,
grpc_error_string(error));
} }
} }
GPR_ASSERT(write_state_ == WRITE_PENDING); GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY; tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref( write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info.bytes_transfered); write_buf_, 0, winsocket_->write_info.bytes_transfered);
GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
GetName(), winsocket_->write_info.bytes_transfered);
} else { } else {
grpc_slice_unref_internal(write_buf_); grpc_slice_unref_internal(write_buf_);
write_buf_ = grpc_empty_slice(); write_buf_ = grpc_empty_slice();
@ -386,9 +681,22 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
grpc_closure outer_read_closure_; grpc_closure outer_read_closure_;
grpc_closure outer_write_closure_; grpc_closure outer_write_closure_;
grpc_winsocket* winsocket_; grpc_winsocket* winsocket_;
WriteState write_state_; // tcp_write_state_ is only used on TCP GrpcPolledFds
WriteState tcp_write_state_;
char* name_ = nullptr; char* name_ = nullptr;
bool gotten_into_driver_list_; bool gotten_into_driver_list_;
int address_family_;
int socket_type_;
grpc_closure on_tcp_connect_locked_;
bool connect_done_ = false;
int wsa_connect_error_ = 0;
// We don't run register_for_{readable,writeable} logic until
// a socket is connected. In the interim, we queue readable/writeable
// registrations with the following state.
grpc_closure continue_register_for_on_readable_locked_;
grpc_closure continue_register_for_on_writeable_locked_;
grpc_closure* pending_continue_register_for_on_readable_locked_ = nullptr;
grpc_closure* pending_continue_register_for_on_writeable_locked_ = nullptr;
}; };
struct SockToPolledFdEntry { struct SockToPolledFdEntry {
@ -454,39 +762,53 @@ class SockToPolledFdMap {
* objects. * objects.
*/ */
static ares_socket_t Socket(int af, int type, int protocol, void* user_data) { static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
if (type != SOCK_DGRAM && type != SOCK_STREAM) {
GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type);
return INVALID_SOCKET;
}
SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data); SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
SOCKET s = WSASocket(af, type, protocol, nullptr, 0, SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
grpc_get_default_wsa_socket_flags()); grpc_get_default_wsa_socket_flags());
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
GRPC_CARES_TRACE_LOG(
"WSASocket failed with params af:%d type:%d protocol:%d", af, type,
protocol);
return s; return s;
} }
grpc_tcp_set_non_block(s); grpc_tcp_set_non_block(s);
GrpcPolledFdWindows* polled_fd = GrpcPolledFdWindows* polled_fd =
New<GrpcPolledFdWindows>(s, map->combiner_); New<GrpcPolledFdWindows>(s, map->combiner_, af, type);
GRPC_CARES_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
map->AddNewSocket(s, polled_fd); map->AddNewSocket(s, polled_fd);
return s; return s;
} }
static int Connect(ares_socket_t as, const struct sockaddr* target, static int Connect(ares_socket_t as, const struct sockaddr* target,
ares_socklen_t target_len, void* user_data) { ares_socklen_t target_len, void* user_data) {
WSAErrorContext wsa_error_ctx;
SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data); SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as); GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
return polled_fd->Connect(target, target_len); return polled_fd->Connect(&wsa_error_ctx, target, target_len);
} }
static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov, static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
int iovec_count, void* user_data) { int iovec_count, void* user_data) {
WSAErrorContext wsa_error_ctx;
SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data); SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as); GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
return polled_fd->SendV(iov, iovec_count); return polled_fd->SendV(&wsa_error_ctx, iov, iovec_count);
} }
static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len, static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
int flags, struct sockaddr* from, int flags, struct sockaddr* from,
ares_socklen_t* from_len, void* user_data) { ares_socklen_t* from_len, void* user_data) {
WSAErrorContext wsa_error_ctx;
SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data); SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as); GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
return polled_fd->RecvFrom(data, data_len, flags, from, from_len); return polled_fd->RecvFrom(&wsa_error_ctx, data, data_len, flags, from,
from_len);
} }
static int CloseSocket(SOCKET s, void* user_data) { static int CloseSocket(SOCKET s, void* user_data) {

Loading…
Cancel
Save