Add debugging

pull/1888/head
Craig Tiller 10 years ago
parent 5ec3bfab9b
commit 463f23712f
  1. 7
      src/core/iomgr/pollset_posix.c
  2. 38
      src/core/surface/completion_queue.c
  3. 9
      src/core/surface/completion_queue.h
  4. 4
      src/core/surface/server.c

@ -134,6 +134,7 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
r = pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
gpr_log(GPR_DEBUG, "shutting_down': cbs=%d ctr=%d", pollset->in_flight_cbs, pollset->counter);
if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
} else if (pollset->in_flight_cbs == 0) {
@ -155,9 +156,11 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
int in_flight_cbs;
int counter;
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
in_flight_cbs = pollset->in_flight_cbs;
counter = pollset->counter;
gpr_log(GPR_DEBUG, "shutting_down: cbs=%d ctr=%d", in_flight_cbs, counter);
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
if (counter > 0) {
@ -207,7 +210,7 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
while (pollset->counter != 0) {
if (pollset->counter != 0) {
grpc_pollset_kick(pollset);
grpc_iomgr_add_callback(basic_do_promote, up_args);
gpr_mu_unlock(&pollset->mu);
@ -344,10 +347,10 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = 1;
pollset->counter++;
if (fd) {
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
pollset->counter++;
gpr_mu_unlock(&pollset->mu);
pfd[1].events =
grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);

@ -86,16 +86,38 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
return cc;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) {
gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
#endif
gpr_ref(&cc->owning_refs);
}
static void on_pollset_destroy_done(void *arg) {
grpc_completion_queue *cc = arg;
grpc_cq_internal_unref(cc);
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) {
gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
#else
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->queue == NULL);
grpc_pollset_destroy(&cc->pollset);
@ -170,7 +192,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
event *ev = NULL;
grpc_event ret;
grpc_cq_internal_ref(cc);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->queue != NULL) {
@ -202,7 +224,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
}
@ -210,7 +232,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
@ -248,7 +270,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
event *ev = NULL;
grpc_event ret;
grpc_cq_internal_ref(cc);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if ((ev = pluck_event(cc, tag))) {
@ -263,7 +285,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
}
}
@ -271,7 +293,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
}
@ -292,7 +314,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
}
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
grpc_cq_internal_unref(cc);
GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {

@ -39,8 +39,17 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason);
void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason);
#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc, reason)
#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc, reason)
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc);
void grpc_cq_internal_unref(grpc_completion_queue *cc);
#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc)
#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc)
#endif
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */

@ -276,7 +276,7 @@ static void server_delete(grpc_server *server) {
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
grpc_cq_internal_unref(server->cqs[i]);
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
}
gpr_free(server->cqs);
gpr_free(server->pollsets);
@ -632,7 +632,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
grpc_cq_internal_ref(cq);
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));

Loading…
Cancel
Save