Cleaning up Windows endpoint code

- removed the need to track some state
- fixed chttp2 transport endpoint shutdown management
- made a bunch of tests pass
pull/3097/head
Craig Tiller 10 years ago
parent ae69ad1bcf
commit 9f80fcf8e7
  1. 20
      src/core/iomgr/iocp_windows.c
  2. 1
      src/core/iomgr/iocp_windows.h
  3. 41
      src/core/iomgr/socket_windows.c
  4. 16
      src/core/iomgr/socket_windows.h
  5. 9
      src/core/iomgr/tcp_client_windows.c
  6. 6
      src/core/iomgr/tcp_server_windows.c
  7. 114
      src/core/iomgr/tcp_windows.c
  8. 3
      src/core/transport/chttp2/internal.h
  9. 32
      src/core/transport/chttp2_transport.c
  10. 7
      test/core/iomgr/endpoint_pair_test.c
  11. 111
      test/core/iomgr/endpoint_tests.c

@ -52,7 +52,6 @@ static OVERLAPPED g_iocp_custom_overlap;
static gpr_event g_shutdown_iocp;
static gpr_event g_iocp_done;
static gpr_atm g_orphans = 0;
static gpr_atm g_custom_events = 0;
static HANDLE g_iocp;
@ -92,22 +91,13 @@ static void do_iocp_work() {
gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
GPR_ASSERT(info->outstanding);
if (socket->orphan) {
info->outstanding = 0;
if (!socket->read_info.outstanding && !socket->write_info.outstanding) {
grpc_winsocket_destroy(socket);
gpr_atm_full_fetch_add(&g_orphans, -1);
}
return;
}
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
FALSE, &flags);
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
gpr_mu_lock(&socket->state_mu);
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->cb) {
f = info->cb;
opaque = info->opaque;
@ -120,7 +110,7 @@ static void do_iocp_work() {
}
static void iocp_loop(void *p) {
while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) ||
while (gpr_atm_acq_load(&g_custom_events) ||
!gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
@ -175,12 +165,6 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
GPR_ASSERT(!socket->orphan);
gpr_atm_full_fetch_add(&g_orphans, 1);
socket->orphan = 1;
}
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.

@ -42,7 +42,6 @@ void grpc_iocp_init(void);
void grpc_iocp_kick(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
void grpc_iocp_socket_orphan(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_winsocket *,
void (*cb)(void *, int success), void *opaque);

@ -62,46 +62,13 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */
int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
int callbacks_set = 0;
SOCKET socket;
gpr_mu_lock(&winsocket->state_mu);
socket = winsocket->socket;
if (winsocket->read_info.cb) {
callbacks_set++;
grpc_iomgr_closure_init(&winsocket->shutdown_closure,
winsocket->read_info.cb,
winsocket->read_info.opaque);
grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
if (winsocket->write_info.cb) {
callbacks_set++;
grpc_iomgr_closure_init(&winsocket->shutdown_closure,
winsocket->write_info.cb,
winsocket->write_info.opaque);
grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
gpr_mu_unlock(&winsocket->state_mu);
closesocket(socket);
return callbacks_set;
}
/* Abandons a socket. Either we're going to queue it up for garbage collecting
from the IO Completion Port thread, or destroy it immediately. Note that this
mechanisms assumes that we're either always waiting for an operation, or we
explicitly know that we don't. If there is a future case where we can have
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
} else {
grpc_winsocket_destroy(winsocket);
}
void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
shutdown(winsocket->socket, SD_BOTH);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
closesocket(winsocket->socket);
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
gpr_mu_destroy(&winsocket->state_mu);
gpr_free(winsocket);
}

@ -68,8 +68,6 @@ typedef struct grpc_winsocket_callback_info {
/* The results of the overlapped operation. */
DWORD bytes_transfered;
int wsa_error;
/* A boolean indicating that we started an operation. */
int outstanding;
} grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
@ -92,10 +90,6 @@ typedef struct grpc_winsocket {
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp;
/* A boolean to indicate that the caller has abandoned that socket, but
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
grpc_iomgr_closure shutdown_closure;
@ -108,14 +102,10 @@ typedef struct grpc_winsocket {
grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name);
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. Returns the number of callbacks that got setup. */
int grpc_winsocket_shutdown(grpc_winsocket *socket);
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Abandon a socket. */
void grpc_winsocket_orphan(grpc_winsocket *socket);
/* Destroy a socket. Should only be called by the IO Completion Port thread,
or by grpc_winsocket_orphan if there's no pending operation. */
/* Destroy a socket. Should only be called if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */

@ -102,7 +102,6 @@ static void on_connect(void *acp, int from_iocp) {
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
info->outstanding = 0;
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {
char *utf8_message = gpr_format_message(WSAGetLastError());
@ -125,12 +124,10 @@ static void on_connect(void *acp, int from_iocp) {
return;
}
ac->socket->write_info.outstanding = 0;
/* If we don't have an endpoint, it means the connection failed,
so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
if (!ep || aborted) grpc_winsocket_orphan(ac->socket);
if (!ep || aborted) grpc_winsocket_destroy(ac->socket);
async_connect_cleanup(ac);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
@ -196,7 +193,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
socket = grpc_winsocket_create(sock, "client");
info = &socket->write_info;
info->outstanding = 1;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
@ -220,7 +216,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_MONOTONIC));
socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
@ -229,7 +224,7 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
if (socket) {
grpc_winsocket_orphan(socket);
grpc_winsocket_destroy(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}

@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
sp->shutting_down = 1;
s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket);
grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports || s->iomgr_callbacks_pending) {
@ -129,7 +129,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
grpc_winsocket_orphan(sp->socket);
grpc_winsocket_destroy(sp->socket);
}
gpr_free(s->ports);
gpr_free(s);
@ -189,7 +189,6 @@ error:
static void decrement_active_ports_and_notify(server_port *sp) {
sp->shutting_down = 0;
sp->socket->read_info.outstanding = 0;
gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
@ -462,7 +461,6 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
s->ports[i].socket->read_info.outstanding = 1;
start_accept(s->ports + i);
s->active_ports++;
}

@ -97,7 +97,7 @@ typedef struct grpc_tcp {
} grpc_tcp;
static void tcp_free(grpc_tcp *tcp) {
grpc_winsocket_orphan(tcp->socket);
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
gpr_free(tcp);
@ -135,55 +135,35 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
static int on_read(grpc_tcp *tcp, int from_iocp) {
static int on_read(grpc_tcp *tcp, int success) {
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
gpr_slice *slice = NULL;
size_t nslices = 0;
int success;
grpc_winsocket_callback_info *info = &socket->read_info;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
if (do_abort) {
if (from_iocp) {
tcp->socket->read_info.outstanding = 0;
if (success) {
if (socket->read_info.wsa_error != 0) {
if (socket->read_info.wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
success = 0;
gpr_slice_unref(tcp->read_slice);
}
return 0;
}
GPR_ASSERT(tcp->socket->read_info.outstanding);
if (socket->read_info.wsa_error != 0) {
if (socket->read_info.wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
success = 0;
gpr_slice_unref(tcp->read_slice);
} else {
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
gpr_slice_buffer_add(tcp->read_slices, sub);
success = 1;
} else {
gpr_slice_unref(tcp->read_slice);
success = 0;
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
gpr_slice_buffer_add(tcp->read_slices, sub);
success = 1;
} else {
gpr_slice_unref(tcp->read_slice);
success = 0;
}
}
}
tcp->socket->read_info.outstanding = 0;
return success;
}
@ -209,14 +189,10 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
DWORD flags = 0;
WSABUF buffer;
GPR_ASSERT(!tcp->socket->read_info.outstanding);
if (tcp->shutting_down) {
return GRPC_ENDPOINT_ERROR;
}
TCP_REF(tcp, "read");
tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb;
tcp->read_slices = read_slices;
gpr_slice_buffer_reset_and_unref(read_slices);
@ -236,10 +212,11 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
int ok;
info->bytes_transfered = bytes_read;
ok = on_read(tcp, 1);
TCP_UNREF(tcp, "read");
return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
TCP_REF(tcp, "read");
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
@ -260,52 +237,31 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
}
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_write(void *tcpp, int from_iocp) {
static void on_write(void *tcpp, int success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_iomgr_closure *cb;
int success;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
cb = tcp->write_cb;
tcp->write_cb = NULL;
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
if (do_abort) {
if (from_iocp) {
tcp->socket->write_info.outstanding = 0;
}
TCP_UNREF(tcp, "write");
if (cb) {
cb->cb(cb->cb_arg, 0);
}
return;
}
GPR_ASSERT(tcp->socket->write_info.outstanding);
if (info->wsa_error != 0) {
if (info->wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
if (success) {
if (info->wsa_error != 0) {
if (info->wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
success = 0;
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
}
success = 0;
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
success = 1;
}
tcp->socket->write_info.outstanding = 0;
TCP_UNREF(tcp, "write");
cb->cb(cb->cb_arg, success);
}
@ -324,13 +280,10 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
WSABUF *allocated = NULL;
WSABUF *buffers = local_buffers;
GPR_ASSERT(!tcp->socket->write_info.outstanding);
if (tcp->shutting_down) {
return GRPC_ENDPOINT_ERROR;
}
TCP_REF(tcp, "write");
tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb;
tcp->write_slices = slices;
@ -365,11 +318,11 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
tcp->socket->write_info.outstanding = 0;
TCP_UNREF(tcp, "write");
return ret;
}
TCP_REF(tcp, "write");
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
@ -380,7 +333,6 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
tcp->socket->write_info.outstanding = 0;
TCP_UNREF(tcp, "write");
return GRPC_ENDPOINT_ERROR;
}

@ -293,6 +293,9 @@ struct grpc_chttp2_transport {
gpr_refcount refs;
char *peer_string;
/** when this drops to zero it's safe to shutdown the endpoint */
gpr_refcount shutdown_ep_refs;
gpr_mu mu;
/** is the transport destroying itself? */

@ -222,6 +222,8 @@ static void init_transport(grpc_chttp2_transport *t,
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
gpr_ref_init(&t->refs, 2);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
gpr_mu_init(&t->mu);
grpc_mdctx_ref(mdctx);
t->peer_string = grpc_endpoint_get_peer(ep);
@ -336,13 +338,26 @@ static void destroy_transport(grpc_transport *gt) {
UNREF_TRANSPORT(t, "destroy");
}
/** block grpc_endpoint_shutdown being called until a paired
allow_endpoint_shutdown is made */
static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
GPR_ASSERT(t->shutdown_ep_refs.count);
gpr_ref(&t->shutdown_ep_refs);
}
static void allow_endpoint_shutdown(grpc_chttp2_transport *t) {
if (gpr_unref(&t->shutdown_ep_refs)) {
grpc_endpoint_shutdown(t->ep);
}
}
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE,
"close_transport");
if (t->ep) {
grpc_endpoint_shutdown(t->ep);
allow_endpoint_shutdown(t);
}
}
}
@ -471,6 +486,7 @@ static void unlock(grpc_chttp2_transport *t) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
run_closures = t->global.pending_closures_head;
@ -536,6 +552,7 @@ void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
static void writing_action(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
grpc_chttp2_perform_writes(&t->writing, t->ep);
allow_endpoint_shutdown(t);
}
void grpc_chttp2_add_incoming_goaway(
@ -1104,21 +1121,28 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
read_error_locked(t);
} else {
keep_reading = 1;
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
unlock(t);
if (keep_reading) {
int ret = -1;
switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
case GRPC_ENDPOINT_DONE:
*success = 1;
return 1;
ret = 1;
break;
case GRPC_ENDPOINT_ERROR:
*success = 0;
return 1;
ret = 1;
break;
case GRPC_ENDPOINT_PENDING:
return 0;
ret = 0;
break;
}
allow_endpoint_shutdown(t);
return ret;
} else {
UNREF_TRANSPORT(t, "recv_data");
return 0;

@ -33,13 +33,6 @@
#include "src/core/iomgr/tcp_posix.h"
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

@ -39,6 +39,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/util/test_config.h"
/*
@ -128,6 +129,7 @@ struct read_and_write_test_state {
static void read_and_write_test_read_handler(void *data, int success) {
struct read_and_write_test_state *state = data;
loop:
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes || !success) {
@ -140,11 +142,11 @@ static void read_and_write_test_read_handler(void *data, int success) {
switch (grpc_endpoint_read(state->read_ep, &state->incoming,
&state->done_read)) {
case GRPC_ENDPOINT_ERROR:
read_and_write_test_read_handler(data, 0);
break;
success = 0;
goto loop;
case GRPC_ENDPOINT_DONE:
read_and_write_test_read_handler(data, 1);
break;
success = 1;
goto loop;
case GRPC_ENDPOINT_PENDING:
break;
}
@ -176,16 +178,17 @@ static void read_and_write_test_write_handler(void *data, int success) {
gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
&state->done_write);
gpr_log(GPR_DEBUG, "write_status=%d", write_status);
GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
free(slices);
if (write_status == GRPC_ENDPOINT_PENDING) {
return;
} else if (write_status == GRPC_ENDPOINT_ERROR) {
goto cleanup;
}
}
GPR_ASSERT(state->bytes_written == state->target_bytes);
}
cleanup:
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1 + success;
@ -204,6 +207,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d",
num_bytes, write_size, slice_size, shutdown);
if (shutdown) {
gpr_log(GPR_INFO, "Start read and write shutdown test");
@ -264,11 +269,11 @@ static void read_and_write_test(grpc_endpoint_test_config config,
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
end_test(config);
gpr_slice_buffer_destroy(&state.outgoing);
gpr_slice_buffer_destroy(&state.incoming);
end_test(config);
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
}
struct timeout_test_state {
@ -286,6 +291,7 @@ static void shutdown_during_write_test_read_handler(void *user_data,
int success) {
shutdown_during_write_test_state *st = user_data;
loop:
if (!success) {
grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@ -297,11 +303,11 @@ static void shutdown_during_write_test_read_handler(void *user_data,
case GRPC_ENDPOINT_PENDING:
break;
case GRPC_ENDPOINT_ERROR:
shutdown_during_write_test_read_handler(user_data, 0);
break;
success = 0;
goto loop;
case GRPC_ENDPOINT_DONE:
shutdown_during_write_test_read_handler(user_data, 1);
break;
success = 1;
goto loop;
}
}
}
@ -324,86 +330,15 @@ static void shutdown_during_write_test_write_handler(void *user_data,
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
static void shutdown_during_write_test(grpc_endpoint_test_config config,
size_t slice_size) {
/* test that shutdown with a pending write creates no leaks */
gpr_timespec deadline;
size_t size;
size_t nblocks;
int current_data = 1;
shutdown_during_write_test_state read_st;
shutdown_during_write_test_state write_st;
gpr_slice *slices;
gpr_slice_buffer outgoing;
grpc_iomgr_closure done_write;
grpc_endpoint_test_fixture f =
begin_test(config, "shutdown_during_write_test", slice_size);
gpr_log(GPR_INFO, "testing shutdown during a write");
read_st.ep = f.client_ep;
write_st.ep = f.server_ep;
read_st.done = 0;
write_st.done = 0;
grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
&write_st);
grpc_iomgr_closure_init(&read_st.done_read,
shutdown_during_write_test_read_handler, &read_st);
gpr_slice_buffer_init(&read_st.incoming);
gpr_slice_buffer_init(&outgoing);
GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
&read_st.done_read) == GRPC_ENDPOINT_PENDING);
for (size = 1;; size *= 2) {
slices = allocate_blocks(size, 1, &nblocks, &current_data);
gpr_slice_buffer_reset_and_unref(&outgoing);
gpr_slice_buffer_addn(&outgoing, slices, nblocks);
switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
case GRPC_ENDPOINT_DONE:
break;
case GRPC_ENDPOINT_ERROR:
gpr_log(GPR_ERROR, "error writing");
abort();
case GRPC_ENDPOINT_PENDING:
grpc_endpoint_shutdown(write_st.ep);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!write_st.done) {
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
grpc_endpoint_destroy(write_st.ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!read_st.done) {
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices);
gpr_slice_buffer_destroy(&read_st.incoming);
gpr_slice_buffer_destroy(&outgoing);
end_test(config);
return;
}
gpr_free(slices);
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
void grpc_endpoint_tests(grpc_endpoint_test_config config,
grpc_pollset *pollset) {
size_t i;
g_pollset = pollset;
read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1);
shutdown_during_write_test(config, 1000);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
read_and_write_test(config, 40320, i, i, 0);
}
g_pollset = NULL;
}

Loading…
Cancel
Save