Getting stuff working

pull/3423/head
Craig Tiller 10 years ago
parent 8f3addcc4d
commit 3cd6a5158d
  1. 4
      src/core/client_config/lb_policies/pick_first.c
  2. 4
      src/core/client_config/resolvers/dns_resolver.c
  3. 4
      src/core/client_config/resolvers/sockaddr_resolver.c
  4. 4
      src/core/client_config/subchannel.c
  5. 2
      src/core/httpcli/httpcli.c
  6. 4
      src/core/iomgr/fd_posix.c
  7. 2
      src/core/iomgr/iomgr.c
  8. 2
      src/core/iomgr/tcp_server_posix.c
  9. 2
      src/core/iomgr/udp_server.c
  10. 15
      src/core/iomgr/workqueue.h
  11. 79
      src/core/iomgr/workqueue_posix.c
  12. 7
      src/core/surface/server.c
  13. 5
      src/core/transport/chttp2_transport.c
  14. 3
      src/core/transport/connectivity_state.c
  15. 2
      test/core/bad_client/bad_client.c
  16. 2
      test/core/end2end/fixtures/h2_sockpair+trace.c
  17. 3
      test/core/end2end/fixtures/h2_sockpair.c
  18. 2
      test/core/end2end/fixtures/h2_sockpair_1byte.c
  19. 2
      test/core/iomgr/endpoint_pair_test.c
  20. 2
      test/core/iomgr/fd_conservation_posix_test.c
  21. 2
      test/core/iomgr/fd_posix_test.c
  22. 2
      test/core/iomgr/tcp_client_posix_test.c
  23. 2
      test/core/iomgr/tcp_posix_test.c
  24. 2
      test/core/iomgr/workqueue_test.c
  25. 2
      test/core/security/secure_endpoint_test.c

@ -104,7 +104,7 @@ void pf_destroy(grpc_lb_policy *pol) {
grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
grpc_workqueue_unref(p->workqueue);
GRPC_WORKQUEUE_UNREF(p->workqueue, "pick_first");
gpr_free(p);
}
@ -331,7 +331,7 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue;
grpc_workqueue_ref(p->workqueue);
GRPC_WORKQUEUE_REF(p->workqueue, "pick_first");
grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
GRPC_CHANNEL_IDLE, "pick_first");
memcpy(p->subchannels, args->subchannels,

@ -197,7 +197,7 @@ static void dns_destroy(grpc_resolver *gr) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
grpc_workqueue_unref(r->workqueue);
GRPC_WORKQUEUE_UNREF(r->workqueue, "dns");
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@ -227,7 +227,7 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->workqueue = args->workqueue;
grpc_workqueue_ref(r->workqueue);
GRPC_WORKQUEUE_REF(r->workqueue, "dns");
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}

@ -157,7 +157,7 @@ static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
grpc_workqueue_unref(r->workqueue);
GRPC_WORKQUEUE_UNREF(r->workqueue, "sockaddr");
gpr_free(r->addrs);
gpr_free(r->addrs_len);
gpr_free(r->lb_policy_name);
@ -331,7 +331,7 @@ static grpc_resolver *sockaddr_create(
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->workqueue = args->workqueue;
grpc_workqueue_ref(r->workqueue);
GRPC_WORKQUEUE_REF(r->workqueue, "sockaddr");
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;

@ -260,7 +260,7 @@ static void subchannel_destroy(grpc_subchannel *c) {
grpc_mdctx_unref(c->mdctx);
grpc_connectivity_state_destroy(&c->state_tracker);
grpc_connector_unref(c->connector);
grpc_workqueue_unref(c->workqueue);
GRPC_WORKQUEUE_UNREF(c->workqueue, "subchannel");
gpr_free(c);
}
@ -298,7 +298,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->mdctx = args->mdctx;
c->master = args->master;
c->workqueue = grpc_channel_get_workqueue(c->master);
grpc_workqueue_ref(c->workqueue);
GRPC_WORKQUEUE_REF(c->workqueue, "subchannel");
c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);

@ -106,7 +106,7 @@ static void finish(internal_request *req, int success) {
grpc_iomgr_unregister_object(&req->iomgr_obj);
gpr_slice_buffer_destroy(&req->incoming);
gpr_slice_buffer_destroy(&req->outgoing);
grpc_workqueue_unref(req->workqueue);
GRPC_WORKQUEUE_UNREF(req->workqueue, "destroy");
gpr_free(req);
}

@ -72,7 +72,7 @@ static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
if (fd->workqueue->wakeup_read_fd != fd) {
grpc_workqueue_unref(fd->workqueue);
GRPC_WORKQUEUE_UNREF(fd->workqueue, "fd");
}
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@ -167,7 +167,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) {
/* if the wakeup_read_fd is NULL, then the workqueue is under construction
==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */
if (workqueue->wakeup_read_fd != NULL) {
grpc_workqueue_ref(workqueue);
GRPC_WORKQUEUE_REF(workqueue, "fd");
}
grpc_iomgr_register_object(&r->iomgr_object, name);
return r;

@ -126,8 +126,6 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
memset(&g_root_object, 0, sizeof(g_root_object));
grpc_alarm_list_shutdown();
grpc_iomgr_platform_shutdown();

@ -151,7 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
grpc_workqueue_unref(s->workqueue);
GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy");
gpr_free(s);
}

@ -144,7 +144,7 @@ static void finish_shutdown(grpc_udp_server *s) {
gpr_cv_destroy(&s->cv);
gpr_free(s->ports);
grpc_workqueue_unref(s->workqueue);
GRPC_WORKQUEUE_UNREF(s->workqueue, "workqueue");
gpr_free(s);
}

@ -52,8 +52,23 @@ typedef struct grpc_workqueue grpc_workqueue;
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(void);
void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
#define GRPC_WORKQUEUE_UNREF(p, r) \
grpc_workqueue_unref((p), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
#else
#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
#define GRPC_WORKQUEUE_UNREF(p, r) grpc_workqueue_unref((p))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_workqueue *workqueue);
#endif
/** Bind this workqueue to a pollset */
void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,

@ -35,12 +35,15 @@
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/workqueue.h"
#include <stdio.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/fd_posix.h"
static void on_readable(void *arg, int success);
@ -61,15 +64,81 @@ grpc_workqueue *grpc_workqueue_create(void) {
return workqueue;
}
static void shutdown_thread(void *arg) {
grpc_iomgr_closure *todo = arg;
while (todo) {
grpc_iomgr_closure *next = todo->next;
todo->cb(todo->cb_arg, todo->success);
todo = next;
}
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static size_t count_waiting(grpc_workqueue *workqueue) {
size_t i = 0;
grpc_iomgr_closure *c;
for (c = workqueue->head.next; c; c = c->next) {
i++;
}
return i;
}
#endif
void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) {
grpc_iomgr_closure *todo;
gpr_thd_id thd;
gpr_mu_lock(&workqueue->mu);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue,
count_waiting(workqueue),
asynchronously ? "asynchronously" : "synchronously");
#endif
todo = workqueue->head.next;
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
gpr_mu_unlock(&workqueue->mu);
if (todo != NULL) {
if (asynchronously) {
gpr_thd_new(&thd, shutdown_thread, todo, NULL);
} else {
while (todo) {
grpc_iomgr_closure *next = todo->next;
todo->cb(todo->cb_arg, todo->success);
todo = next;
}
}
}
}
static void workqueue_destroy(grpc_workqueue *workqueue) {
GPR_ASSERT(workqueue->tail == &workqueue->head);
grpc_fd_shutdown(workqueue->wakeup_read_fd);
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
reason);
#else
void grpc_workqueue_ref(grpc_workqueue *workqueue) {
#endif
gpr_ref(&workqueue->refs);
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
reason);
#else
void grpc_workqueue_unref(grpc_workqueue *workqueue) {
#endif
if (gpr_unref(&workqueue->refs)) {
workqueue_destroy(workqueue);
}
@ -94,6 +163,10 @@ static void on_readable(void *arg, int success) {
return;
} else {
gpr_mu_lock(&workqueue->mu);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
count_waiting(workqueue));
#endif
todo = workqueue->head.next;
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
@ -119,6 +192,10 @@ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
}
workqueue->tail->next = closure;
workqueue->tail = closure;
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
count_waiting(workqueue));
#endif
gpr_mu_unlock(&workqueue->mu);
}

@ -365,7 +365,7 @@ static void server_delete(grpc_server *server) {
}
request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
grpc_workqueue_unref(server->workqueue);
GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy");
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@ -392,6 +392,7 @@ static void orphan_channel(channel_data *chand) {
static void finish_destroy_channel(void *cd, int success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel);
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
server_unref(server);
}
@ -404,6 +405,8 @@ static void destroy_channel(channel_data *chand) {
maybe_finish_shutdown(chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel,
chand->server->workqueue);
grpc_workqueue_push(chand->server->workqueue,
&chand->finish_destroy_channel_closure, 1);
}
@ -1074,6 +1077,8 @@ void grpc_server_destroy(grpc_server *server) {
gpr_mu_unlock(&server->mu_global);
grpc_workqueue_flush(server->workqueue, 0);
server_unref(server);
}

@ -242,8 +242,9 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
grpc_connectivity_state_init(&t->channel_callback.state_tracker, workqueue,
GRPC_CHANNEL_READY, "transport");
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, workqueue, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
gpr_slice_buffer_init(&t->global.qbuf);

@ -61,6 +61,8 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
const char *name) {
tracker->current_state = init_state;
tracker->watchers = NULL;
tracker->workqueue = workqueue;
GRPC_WORKQUEUE_REF(workqueue, name);
tracker->name = gpr_strdup(name);
}
@ -79,6 +81,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
grpc_workqueue_push(tracker->workqueue, w->notify, success);
gpr_free(w);
}
GRPC_WORKQUEUE_UNREF(tracker->workqueue, tracker->name);
gpr_free(tracker->name);
}

@ -168,6 +168,6 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
grpc_completion_queue_destroy(a.cq);
gpr_slice_buffer_destroy(&outgoing);
grpc_workqueue_unref(workqueue);
GRPC_WORKQUEUE_UNREF(workqueue, "destroy");
grpc_shutdown();
}

@ -171,7 +171,7 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();
return 0;

@ -157,7 +157,8 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
grpc_workqueue_unref(g_workqueue);
grpc_workqueue_flush(g_workqueue, 1);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();
return 0;

@ -157,7 +157,7 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();
return 0;

@ -74,7 +74,7 @@ int main(int argc, char **argv) {
g_workqueue = grpc_workqueue_create();
grpc_endpoint_tests(configs[0], &g_pollset);
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();
return 0;

@ -61,7 +61,7 @@ int main(int argc, char **argv) {
grpc_endpoint_destroy(p.server);
}
grpc_workqueue_unref(workqueue);
GRPC_WORKQUEUE_UNREF(workqueue, "destroy");
grpc_iomgr_shutdown();
return 0;
}

@ -495,7 +495,7 @@ int main(int argc, char **argv) {
test_grpc_fd();
test_grpc_fd_change();
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_iomgr_shutdown();
return 0;
}

@ -247,7 +247,7 @@ int main(int argc, char **argv) {
test_times_out();
grpc_pollset_set_destroy(&g_pollset_set);
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();
return 0;
}

@ -479,7 +479,7 @@ int main(int argc, char **argv) {
grpc_pollset_init(&g_pollset);
run_tests();
grpc_endpoint_tests(configs[0], &g_pollset);
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_shutdown();

@ -66,7 +66,7 @@ static void test_add_closure(void) {
GPR_ASSERT(done);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_workqueue_unref(wq);
GRPC_WORKQUEUE_UNREF(wq, "destroy");
}
static void done_shutdown(void *arg) { grpc_pollset_destroy(arg); }

@ -170,7 +170,7 @@ int main(int argc, char **argv) {
grpc_pollset_init(&g_pollset);
grpc_endpoint_tests(configs[0], &g_pollset);
test_leftover(configs[1], 1);
grpc_workqueue_unref(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_shutdown();

Loading…
Cancel
Save