Wait for name resolutions to complete before shutting down iomgr.

This at least avoids crashing when shutting down during name resolution.
There is still a memory leak to track down before I add a test that exposes
this.

This CL also makes some tiny cleanups and debuggability improvements.
	Change on 2014/12/11 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81882486
pull/1/merge
ctiller 10 years ago committed by Jan Tattermusch
parent 465554e5b6
commit ccd27fd9b2
  1. 2
      src/core/channel/client_setup.c
  2. 32
      src/core/iomgr/iomgr_libevent.c
  3. 2
      src/core/iomgr/iomgr_libevent.h
  4. 9
      src/core/iomgr/resolve_address_posix.c
  5. 8
      src/core/iomgr/tcp_client_posix.c

@ -211,7 +211,7 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
if (retry) {
/* TODO(klempner): Replace these values with further consideration. 2x is
probably too aggressive of a backoff. */
gpr_timespec max_backoff = gpr_time_from_micros(120000000);
gpr_timespec max_backoff = gpr_time_from_minutes(2);
GPR_ASSERT(!s->in_alarm);
s->in_alarm = 1;
grpc_alarm_init(&s->backoff_alarm, backoff_alarm_done, s);

@ -59,6 +59,7 @@ gpr_cv grpc_iomgr_cv;
static grpc_libevent_activation_data *g_activation_queue;
static int g_num_pollers;
static int g_num_fds;
static int g_num_address_resolutions;
static gpr_timespec g_last_poll_completed;
static int g_shutdown_backup_poller;
static gpr_event g_backup_poller_done;
@ -69,6 +70,18 @@ static grpc_fd *g_fds_to_free;
int evthread_use_threads(void);
static void grpc_fd_impl_destroy(grpc_fd *impl);
void grpc_iomgr_ref_address_resolution(int delta) {
gpr_mu_lock(&grpc_iomgr_mu);
gpr_log(GPR_DEBUG, "num_address_resolutions = %d + %d",
g_num_address_resolutions, delta);
GPR_ASSERT(!g_shutdown_backup_poller);
g_num_address_resolutions += delta;
if (0 == g_num_address_resolutions) {
gpr_cv_broadcast(&grpc_iomgr_cv);
}
gpr_mu_unlock(&grpc_iomgr_mu);
}
/* If anything is in the work queue, process one item and return 1.
Return 0 if there were no work items to complete.
Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
@ -86,6 +99,10 @@ static int maybe_do_queue_work() {
g_activation_queue;
}
work->next = work->prev = NULL;
/* force status to cancelled from ok when shutting down */
if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) {
work->status = GRPC_CALLBACK_CANCELLED;
}
gpr_mu_unlock(&grpc_iomgr_mu);
work->cb(work->arg, work->status);
@ -225,6 +242,7 @@ void grpc_iomgr_init() {
g_activation_queue = NULL;
g_num_pollers = 0;
g_num_fds = 0;
g_num_address_resolutions = 0;
g_last_poll_completed = gpr_now();
g_shutdown_backup_poller = 0;
g_fds_to_free = NULL;
@ -256,17 +274,19 @@ void grpc_iomgr_shutdown() {
/* broadcast shutdown */
gpr_mu_lock(&grpc_iomgr_mu);
while (g_num_fds) {
while (g_num_fds > 0 || g_num_address_resolutions > 0) {
gpr_log(GPR_INFO,
"waiting for %d fds to be destroyed before closing event manager",
g_num_fds);
"waiting for %d fds and %d name resolutions to be destroyed before "
"closing event manager",
g_num_fds, g_num_address_resolutions);
if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) {
gpr_log(GPR_ERROR,
"not all fds destroyed before shutdown deadline: memory leaks "
"not all fds or name resolutions destroyed before shutdown "
"deadline: memory leaks "
"are likely");
break;
} else if (g_num_fds == 0) {
gpr_log(GPR_INFO, "all fds closed");
} else if (g_num_fds == 0 && g_num_address_resolutions == 0) {
gpr_log(GPR_INFO, "all fds closed, all name resolutions finished");
}
}

@ -204,4 +204,6 @@ struct grpc_alarm {
gpr_atm triggered; /* To be used atomically if alarm triggered */
};
void grpc_iomgr_ref_address_resolution(int delta);
#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */

@ -41,6 +41,7 @@
#include <unistd.h>
#include <string.h>
#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
@ -192,10 +193,15 @@ done:
/* Thread function to asynch-ify grpc_blocking_resolve_address */
static void do_request(void *rp) {
request *r = rp;
r->cb(r->arg, grpc_blocking_resolve_address(r->name, r->default_port));
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
void *arg = r->arg;
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
cb(arg, resolved);
grpc_iomgr_ref_address_resolution(-1);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@ -207,6 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
grpc_iomgr_ref_address_resolution(1);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;

@ -109,6 +109,14 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
return;
} else {
switch (so_error) {
case ECONNREFUSED:
gpr_log(GPR_ERROR, "socket error: connection refused");
break;
default:
gpr_log(GPR_ERROR, "socket error: %d", so_error);
break;
}
goto error;
}
} else {

Loading…
Cancel
Save