Label all iomgr objects

This allows a list of them to be dumped to isolate where memory leaks
are occuring.
pull/1856/head
Craig Tiller 10 years ago
parent 0bdfe8b147
commit fa275a97b9
  1. 3
      src/core/iomgr/endpoint_pair.h
  2. 17
      src/core/iomgr/endpoint_pair_posix.c
  3. 7
      src/core/iomgr/fd_posix.c
  4. 6
      src/core/iomgr/fd_posix.h
  5. 65
      src/core/iomgr/iomgr.c
  6. 10
      src/core/iomgr/iomgr_internal.h
  7. 9
      src/core/iomgr/resolve_address_posix.c
  8. 22
      src/core/iomgr/tcp_client_posix.c
  9. 23
      src/core/iomgr/tcp_server_posix.c
  10. 2
      test/core/bad_client/bad_client.c
  11. 2
      test/core/end2end/fixtures/chttp2_socket_pair.c
  12. 2
      test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
  13. 2
      test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c
  14. 8
      test/core/iomgr/fd_posix_test.c
  15. 16
      test/core/iomgr/tcp_posix_test.c
  16. 2
      test/core/security/secure_endpoint_test.c

@ -41,6 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size);
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */

@ -44,6 +44,8 @@
#include <sys/socket.h>
#include "src/core/iomgr/tcp_posix.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
static void create_sockets(int sv[2]) {
@ -55,12 +57,21 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size);
p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size);
gpr_asprintf(&final_name, "%s:client", name);
p.client =
grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size);
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server =
grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size);
gpr_free(final_name);
return p;
}

@ -41,7 +41,6 @@
#include <sys/socket.h>
#include <unistd.h>
#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@ -119,7 +118,7 @@ static void unref_by(grpc_fd *fd, int n) {
close(fd->fd);
grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
freelist_fd(fd);
grpc_iomgr_unref();
grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
GPR_ASSERT(old > n);
}
@ -138,9 +137,9 @@ void grpc_fd_global_shutdown(void) {
static void do_nothing(void *ignored, int success) {}
grpc_fd *grpc_fd_create(int fd) {
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
grpc_iomgr_ref();
grpc_iomgr_register_object(&r->iomgr_object, name);
grpc_pollset_add_fd(grpc_backup_pollset(), r);
return r;
}

@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
@ -99,12 +99,14 @@ struct grpc_fd {
grpc_iomgr_cb_func on_done;
void *on_done_user_data;
struct grpc_fd *freelist_next;
grpc_iomgr_object iomgr_object;
};
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd);
grpc_fd *grpc_fd_create(int fd, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.

@ -37,6 +37,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@ -54,8 +55,8 @@ static gpr_cv g_rcv;
static delayed_callback *g_cbs_head = NULL;
static delayed_callback *g_cbs_tail = NULL;
static int g_shutdown;
static int g_refs;
static gpr_event g_background_callback_executor_done;
static grpc_iomgr_object g_root_object;
/* Execute followup callbacks continuously.
Other threads may check in and help during pollset_work() */
@ -96,34 +97,49 @@ void grpc_iomgr_init(void) {
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_alarm_list_init(gpr_now());
g_refs = 0;
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
gpr_event_init(&g_background_callback_executor_done);
gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}
static size_t count_objects(void) {
grpc_iomgr_object *obj;
size_t n = 0;
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
n++;
}
return n;
}
void grpc_iomgr_shutdown(void) {
delayed_callback *cb;
grpc_iomgr_object *obj;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
gpr_mu_lock(&g_mu);
g_shutdown = 1;
while (g_cbs_head || g_refs) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
while (g_cbs_head || g_root_object.next != &g_root_object) {
size_t nobjs = count_objects();
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", nobjs,
g_cbs_head ? " and executing final callbacks" : "");
while (g_cbs_head) {
cb = g_cbs_head;
g_cbs_head = cb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
cb->cb(cb->cb_arg, 0);
gpr_free(cb);
gpr_mu_lock(&g_mu);
if (g_cbs_head) {
do {
cb = g_cbs_head;
g_cbs_head = cb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
cb->cb(cb->cb_arg, 0);
gpr_free(cb);
gpr_mu_lock(&g_mu);
} while (g_cbs_head);
continue;
}
if (g_refs) {
if (nobjs > 0) {
int timeout = 0;
gpr_timespec short_deadline = gpr_time_add(gpr_now(),
gpr_time_from_millis(100));
@ -137,7 +153,10 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
g_refs);
count_objects());
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name);
}
break;
}
}
@ -153,17 +172,21 @@ void grpc_iomgr_shutdown(void) {
gpr_cv_destroy(&g_rcv);
}
void grpc_iomgr_ref(void) {
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) {
obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu);
++g_refs;
obj->next = &g_root_object;
obj->prev = obj->next->prev;
obj->next->prev = obj->prev->next = obj;
gpr_mu_unlock(&g_mu);
}
void grpc_iomgr_unref(void) {
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_free(obj->name);
gpr_mu_lock(&g_mu);
if (0 == --g_refs) {
gpr_cv_signal(&g_rcv);
}
obj->next->prev = obj->prev;
obj->prev->next = obj->next;
gpr_cv_signal(&g_rcv);
gpr_mu_unlock(&g_mu);
}

@ -38,12 +38,18 @@
#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>
typedef struct grpc_iomgr_object {
char *name;
struct grpc_iomgr_object *next;
struct grpc_iomgr_object *prev;
} grpc_iomgr_object;
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
int success);
void grpc_iomgr_ref(void);
void grpc_iomgr_unref(void);
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
void grpc_iomgr_platform_init(void);
void grpc_iomgr_platform_shutdown(void);

@ -55,6 +55,7 @@ typedef struct {
char *default_port;
grpc_resolve_cb cb;
void *arg;
grpc_iomgr_object iomgr_object;
} request;
grpc_resolved_addresses *grpc_blocking_resolve_address(
@ -153,9 +154,9 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
cb(arg, resolved);
grpc_iomgr_unref();
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@ -167,7 +168,11 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
grpc_iomgr_ref();
char *tmp;
gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name,
default_port);
grpc_iomgr_register_object(&r->iomgr_object, tmp);
gpr_free(tmp);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;

@ -48,6 +48,7 @@
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -185,6 +186,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
async_connect *ac;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in addr4_copy;
char *name;
char *addr_str;
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
@ -211,24 +214,27 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
err = connect(fd, addr, addr_len);
} while (err < 0 && errno == EINTR);
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
if (err >= 0) {
gpr_log(GPR_DEBUG, "instant connect");
cb(arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
cb(arg, grpc_tcp_create(grpc_fd_create(fd, name),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error: %s", strerror(errno));
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
close(fd);
cb(arg, NULL);
return;
goto done;
}
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
ac->fd = grpc_fd_create(fd);
ac->fd = grpc_fd_create(fd, name);
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->write_closure.cb = on_writable;
@ -236,6 +242,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
done:
gpr_free(name);
gpr_free(addr_str);
}
#endif

@ -60,6 +60,7 @@
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@ -281,6 +282,8 @@ static void on_read(void *arg, int success) {
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
char *addr_str;
char *name;
/* Note: If we ever decide to return this address to the user, remember to
strip off the ::ffff:0.0.0.0/96 prefix first. */
int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1);
@ -299,9 +302,15 @@ static void on_read(void *arg, int success) {
grpc_set_socket_no_sigpipe_if_possible(fd);
sp->server->cb(
sp->server->cb_arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
sp->server->cb(sp->server->cb_arg,
grpc_tcp_create(grpc_fd_create(fd, name),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
gpr_free(addr_str);
gpr_free(name);
}
abort();
@ -318,9 +327,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, int addr_len) {
server_port *sp;
int port;
char *addr_str;
char *name;
port = prepare_socket(fd, addr, addr_len);
if (port >= 0) {
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb && "must add ports before starting server");
/* append it to the list under a lock */
@ -331,11 +344,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd);
sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(addr_str);
gpr_free(name);
}
return port;

@ -88,7 +88,7 @@ void grpc_run_bad_client_test(const char *name, const char *client_payload,
grpc_init();
/* Create endpoints */
sfd = grpc_iomgr_create_endpoint_pair(65536);
sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
/* Create server, completion events */
a.server = grpc_server_create_from_filters(NULL, 0, NULL);

@ -98,7 +98,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.client_cq = grpc_completion_queue_create();
f.server_cq = grpc_completion_queue_create();
*sfd = grpc_iomgr_create_endpoint_pair(65536);
*sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
return f;
}

@ -98,7 +98,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.client_cq = grpc_completion_queue_create();
f.server_cq = grpc_completion_queue_create();
*sfd = grpc_iomgr_create_endpoint_pair(1);
*sfd = grpc_iomgr_create_endpoint_pair("fixture", 1);
return f;
}

@ -99,7 +99,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.client_cq = grpc_completion_queue_create();
f.server_cq = grpc_completion_queue_create();
*sfd = grpc_iomgr_create_endpoint_pair(65536);
*sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
return f;
}

@ -208,7 +208,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = gpr_malloc(sizeof(*se));
se->sv = sv;
se->em_fd = grpc_fd_create(fd);
se->em_fd = grpc_fd_create(fd, "listener");
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
@ -236,7 +236,7 @@ static int server_start(server *sv) {
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
sv->em_fd = grpc_fd_create(fd);
sv->em_fd = grpc_fd_create(fd, "server");
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
@ -351,7 +351,7 @@ static void client_start(client *cl, int port) {
}
}
cl->em_fd = grpc_fd_create(fd);
cl->em_fd = grpc_fd_create(fd, "client");
client_session_write(cl, 1);
}
@ -447,7 +447,7 @@ static void test_grpc_fd_change(void) {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
em_fd = grpc_fd_create(sv[0]);
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
/* Register the first callback, then make its FD readable */
grpc_fd_notify_on_read(em_fd, &first_closure);

@ -172,7 +172,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@ -213,7 +213,7 @@ static void large_read_test(ssize_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size);
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@ -350,7 +350,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
gpr_mu_init(&state.mu);
gpr_cv_init(&state.cv);
@ -406,7 +407,8 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
close(sv[0]);
gpr_mu_init(&state.mu);
@ -473,8 +475,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
create_sockets(sv);
f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0]), slice_size);
f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
f.client_ep =
grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size);
f.server_ep =
grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size);
return f;
}

@ -51,7 +51,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
tcp = grpc_iomgr_create_endpoint_pair(slice_size);
tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size);
if (leftover_nslices == 0) {
f.client_ep =

Loading…
Cancel
Save