avoid infinite spin without checking condvar

pull/3236/head
Craig Tiller 9 years ago
parent 29d4145a64
commit 49d03c80c9
  1. 18
      src/core/iomgr/iomgr.c
  2. 37
      src/core/iomgr/tcp_client_windows.c

@ -108,8 +108,14 @@ static size_t count_objects(void) {
return n;
}
void grpc_iomgr_shutdown(void) {
static void dump_objects(const char *kind) {
grpc_iomgr_object *obj;
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
gpr_log(GPR_DEBUG, "%s OBJECT: %s %p", kind, obj->name, obj);
}
}
void grpc_iomgr_shutdown(void) {
grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
@ -151,22 +157,22 @@ void grpc_iomgr_shutdown(void) {
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
while (g_cbs_head == NULL) {
gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
break;
}
}
}
if (timeout) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
count_objects());
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s %p", obj->name, obj);
}
dump_objects("LEAKED");
break;
}
}
@ -188,7 +194,7 @@ void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) {
obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu);
obj->next = &g_root_object;
obj->prev = obj->next->prev;
obj->prev = g_root_object.prev;
obj->next->prev = obj->prev->next = obj;
gpr_mu_unlock(&g_mu);
}

@ -60,13 +60,13 @@ typedef struct {
grpc_alarm alarm;
char *addr_name;
int refs;
int aborted;
} async_connect;
static void async_connect_cleanup(async_connect *ac) {
static void async_connect_unlock_and_cleanup(async_connect *ac) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@ -77,10 +77,11 @@ static void on_alarm(void *acp, int occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
/* If the alarm didn't occur, it got cancelled. */
gpr_log(GPR_DEBUG, "on_alarm: %p", ac->socket);
if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket);
}
async_connect_cleanup(ac);
async_connect_unlock_and_cleanup(ac);
}
static void on_connect(void *acp, int from_iocp) {
@ -90,12 +91,12 @@ static void on_connect(void *acp, int from_iocp) {
grpc_winsocket_callback_info *info = &ac->socket->write_info;
void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
int aborted;
grpc_alarm_cancel(&ac->alarm);
gpr_mu_lock(&ac->mu);
aborted = ac->aborted;
gpr_log(GPR_DEBUG, "on_connect: %p", ac->socket);
if (from_iocp) {
DWORD transfered_bytes = 0;
@ -107,31 +108,16 @@ static void on_connect(void *acp, int from_iocp) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
} else if (!aborted) {
} else {
ep = grpc_tcp_create(ac->socket, ac->addr_name);
ac->socket = NULL;
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
/* If the connection timeouts, we will still get a notification from
the IOCP whatever happens. So we're just going to flag that connection
as being in the process of being aborted, and wait for the IOCP. We
can't just orphan the socket now, because the IOCP might already have
gotten a successful connection, which is our worst-case scenario.
We need to call our callback now to respect the deadline. */
ac->aborted = 1;
gpr_mu_unlock(&ac->mu);
cb(cb_arg, NULL);
return;
}
/* If we don't have an endpoint, it means the connection failed,
so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
if (!ep || aborted) grpc_winsocket_destroy(ac->socket);
async_connect_cleanup(ac);
async_connect_unlock_and_cleanup(ac);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
if (!aborted) cb(cb_arg, ep);
cb(cb_arg, ep);
}
/* Tries to issue one async connection, then schedules both an IOCP
@ -212,7 +198,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_MONOTONIC));
@ -223,7 +208,7 @@ failure:
utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
if (socket) {
if (socket != NULL) {
grpc_winsocket_destroy(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);

Loading…
Cancel
Save