Merge branch 'error' of github.com:ctiller/grpc into error

pull/6897/head
Craig Tiller 9 years ago
commit 5b9ce77d9f
  1. 41
      src/core/lib/iomgr/iocp_windows.c
  2. 8
      src/core/lib/iomgr/iocp_windows.h
  3. 58
      src/core/lib/iomgr/socket_windows.c
  4. 11
      src/core/lib/iomgr/socket_windows.h
  5. 12
      src/core/lib/iomgr/tcp_client_windows.c
  6. 6
      src/core/lib/iomgr/tcp_server_windows.c
  7. 1
      src/core/lib/iomgr/tcp_windows.c
  8. 1
      test/core/surface/concurrent_connectivity_test.c

@ -104,7 +104,6 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
} else if (overlapped == &socket->read_info.overlapped) {
info = &socket->read_info;
} else {
gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
@ -112,16 +111,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
closure = info->closure;
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
}
gpr_mu_unlock(&socket->state_mu);
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
grpc_socket_become_ready(exec_ctx, socket, info);
return GRPC_IOCP_WORK_WORK;
}
@ -176,33 +166,4 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket, grpc_closure *closure,
grpc_winsocket_callback_info *info) {
GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
} else {
info->closure = closure;
}
gpr_mu_unlock(&socket->state_mu);
}
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
}
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
}
#endif /* GPR_WINSOCK_SOCKET */

@ -52,12 +52,4 @@ void grpc_iocp_flush(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */

@ -91,10 +91,66 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
closesocket(winsocket->socket);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
static void destroy(grpc_winsocket *winsocket) {
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
gpr_mu_destroy(&winsocket->state_mu);
gpr_free(winsocket);
}
static bool check_destroyable(grpc_winsocket *winsocket) {
return winsocket->destroy_called == true && winsocket->write_info.closure == NULL && winsocket->read_info.closure == NULL;
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
gpr_mu_lock(&winsocket->state_mu);
GPR_ASSERT(!winsocket->destroy_called);
winsocket->destroy_called = true;
bool should_destroy = check_destroyable(winsocket);
gpr_mu_unlock(&winsocket->state_mu);
if (should_destroy) destroy(winsocket);
}
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket, grpc_closure *closure,
grpc_winsocket_callback_info *info) {
GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
} else {
info->closure = closure;
}
gpr_mu_unlock(&socket->state_mu);
}
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
}
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
}
void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_winsocket_callback_info *info) {
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
grpc_exec_ctx_push(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
}
bool should_destroy = check_destroyable(socket);
gpr_mu_unlock(&socket->state_mu);
if (should_destroy) destroy(socket);
}
#endif /* GPR_WINSOCK_SOCKET */

@ -81,6 +81,7 @@ typedef struct grpc_winsocket_callback_info {
is closer to what happens in posix world. */
typedef struct grpc_winsocket {
SOCKET socket;
bool destroy_called;
grpc_winsocket_callback_info write_info;
grpc_winsocket_callback_info read_info;
@ -108,4 +109,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Destroy a socket. Should only be called if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *winsocket, grpc_winsocket_callback_info *ci);
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */

@ -78,18 +78,18 @@ static void async_connect_unlock_and_cleanup(async_connect *ac,
static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
if (ac->socket != NULL) {
grpc_winsocket_shutdown(ac->socket);
grpc_winsocket *socket = ac->socket;
ac->socket = NULL;
if (socket != NULL) {
grpc_winsocket_shutdown(socket);
}
async_connect_unlock_and_cleanup(ac, ac->socket);
async_connect_unlock_and_cleanup(ac, socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
GPR_ASSERT(*ep == NULL);
grpc_winsocket_callback_info *info = &ac->socket->write_info;
grpc_closure *on_done = ac->on_done;
GRPC_ERROR_REF(error);
@ -106,7 +106,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (error == GRPC_ERROR_NONE && socket != NULL) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
BOOL wsa_success = WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
&transfered_bytes, FALSE, &flags);
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {

@ -449,7 +449,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len, int *port) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp = NULL;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard;
@ -512,6 +512,10 @@ done:
"Failed to add port to server", &error, 1);
GRPC_ERROR_UNREF(error);
error = error_out;
*port = -1;
} else {
GPR_ASSERT(sp != NULL);
*port = sp->port;
}
return error;
}

@ -167,7 +167,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
if (error == GRPC_ERROR_NONE) {
if (info->wsa_error != 0 && !tcp->shutting_down) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
error = GRPC_ERROR_CREATE(utf8_message);
gpr_free(utf8_message);
gpr_slice_unref(tcp->read_slice);

@ -118,6 +118,7 @@ void bad_server_thread(void *vargs) {
addr.ss_family = AF_INET;
error =
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len, &port);
GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
GPR_ASSERT(port > 0);
gpr_asprintf(&args->addr, "localhost:%d", port);

Loading…
Cancel
Save