Progress towards workqueue transition

pull/3423/head
Craig Tiller 10 years ago
parent 1701b09333
commit 06a43f5d7e
  1. 11
      src/core/client_config/lb_policies/pick_first.c
  2. 2
      src/core/client_config/lb_policy_factory.h
  3. 5
      src/core/client_config/resolver_factory.c
  4. 15
      src/core/client_config/resolver_factory.h
  5. 14
      src/core/client_config/resolver_registry.c
  6. 5
      src/core/client_config/resolver_registry.h
  7. 31
      src/core/client_config/resolvers/dns_resolver.c
  8. 30
      src/core/client_config/resolvers/sockaddr_resolver.c
  9. 3
      src/core/client_config/subchannel.c
  10. 3
      src/core/iomgr/endpoint_pair.h
  11. 11
      src/core/iomgr/endpoint_pair_posix.c
  12. 30
      src/core/iomgr/fd_posix.c
  13. 4
      src/core/iomgr/fd_posix.h
  14. 109
      src/core/iomgr/iomgr.c
  15. 2
      src/core/iomgr/pollset_multipoller_with_epoll.c
  16. 4
      src/core/iomgr/pollset_posix.c
  17. 1
      src/core/iomgr/tcp_client.h
  18. 15
      src/core/iomgr/workqueue_posix.c
  19. 15
      src/core/iomgr/workqueue_posix.h
  20. 11
      src/core/surface/secure_channel_create.c

@ -52,6 +52,8 @@ typedef struct {
/** all our subchannels */
grpc_subchannel **subchannels;
size_t num_subchannels;
/** workqueue for async work */
grpc_workqueue *workqueue;
grpc_iomgr_closure connectivity_changed;
@ -102,6 +104,7 @@ void pf_destroy(grpc_lb_policy *pol) {
grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
grpc_workqueue_unref(p->workqueue);
gpr_free(p);
}
@ -114,7 +117,7 @@ void pf_shutdown(grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
grpc_workqueue_push(p->workqueue, pp->on_complete, 0);
gpr_free(pp);
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
@ -196,7 +199,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@ -241,7 +244,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
@ -327,6 +330,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue;
grpc_workqueue_ref(p->workqueue);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"pick_first");
memcpy(p->subchannels, args->subchannels,

@ -36,6 +36,7 @@
#include "src/core/client_config/lb_policy.h"
#include "src/core/client_config/subchannel.h"
#include "src/core/iomgr/workqueue.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@ -49,6 +50,7 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_subchannel **subchannels;
size_t num_subchannels;
grpc_workqueue *workqueue;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {

@ -43,10 +43,9 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *factory) {
/** Create a resolver instance for a name */
grpc_resolver *grpc_resolver_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
grpc_resolver_factory *factory, grpc_resolver_args *args) {
if (factory == NULL) return NULL;
return factory->vtable->create_resolver(factory, uri, subchannel_factory);
return factory->vtable->create_resolver(factory, args);
}
char *grpc_resolver_factory_get_default_authority(

@ -37,6 +37,7 @@
#include "src/core/client_config/resolver.h"
#include "src/core/client_config/subchannel_factory.h"
#include "src/core/client_config/uri_parser.h"
#include "src/core/iomgr/workqueue.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
@ -47,14 +48,19 @@ struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable;
};
typedef struct grpc_resolver_args {
grpc_uri *uri;
grpc_subchannel_factory *subchannel_factory;
grpc_workqueue *workqueue;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory);
void (*unref)(grpc_resolver_factory *factory);
/** Implementation of grpc_resolver_factory_create_resolver */
grpc_resolver *(*create_resolver)(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory);
grpc_resolver *(*create_resolver)(grpc_resolver_factory *factory,
grpc_resolver_args *args);
/** Implementation of grpc_resolver_factory_get_default_authority */
char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri);
@ -68,8 +74,7 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *resolver);
/** Create a resolver instance for a name */
grpc_resolver *grpc_resolver_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory);
grpc_resolver_factory *factory, grpc_resolver_args *args);
/** Return a (freshly allocated with gpr_malloc) string representing
the default authority to use for this scheme. */

@ -114,12 +114,18 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory;
}
grpc_resolver *grpc_resolver_create(
const char *target, grpc_subchannel_factory *subchannel_factory) {
grpc_resolver *grpc_resolver_create(const char *target,
grpc_subchannel_factory *subchannel_factory,
grpc_workqueue *workqueue) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver =
grpc_resolver_factory_create_resolver(factory, uri, subchannel_factory);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
args.subchannel_factory = subchannel_factory;
args.workqueue = workqueue;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
}

@ -55,8 +55,9 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and
return it.
If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(
const char *target, grpc_subchannel_factory *subchannel_factory);
grpc_resolver *grpc_resolver_create(const char *target,
grpc_subchannel_factory *subchannel_factory,
grpc_workqueue *workqueue);
/** Given a target, return a (freshly allocated with gpr_malloc) string
representing the default authority to pass from a client. */

@ -49,6 +49,8 @@ typedef struct {
grpc_resolver base;
/** refcount */
gpr_refcount refs;
/** workqueue */
grpc_workqueue *workqueue;
/** name to resolve */
char *name;
/** default port to use */
@ -94,7 +96,7 @@ static void dns_shutdown(grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
grpc_iomgr_add_callback(r->next_completion);
grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@ -180,7 +182,7 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) {
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
grpc_iomgr_add_callback(r->next_completion);
grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
@ -193,21 +195,21 @@ static void dns_destroy(grpc_resolver *gr) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
grpc_workqueue_unref(r->workqueue);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
static grpc_resolver *dns_create(
grpc_uri *uri, const char *default_port,
const char* lb_policy_name,
grpc_subchannel_factory *subchannel_factory) {
static grpc_resolver *dns_create(grpc_resolver_args *args,
const char *default_port,
const char *lb_policy_name) {
dns_resolver *r;
const char *path = uri->path;
const char *path = args->uri->path;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return NULL;
}
@ -220,8 +222,10 @@ static grpc_resolver *dns_create(
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
r->subchannel_factory = subchannel_factory;
grpc_subchannel_factory_ref(subchannel_factory);
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->workqueue = args->workqueue;
grpc_workqueue_ref(r->workqueue);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
@ -235,9 +239,8 @@ static void dns_factory_ref(grpc_resolver_factory *factory) {}
static void dns_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *dns_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
return dns_create(uri, "https", "pick_first", subchannel_factory);
grpc_resolver_factory *factory, grpc_resolver_args *args) {
return dns_create(args, "https", "pick_first");
}
char *dns_factory_get_default_host_name(grpc_resolver_factory *factory,

@ -56,6 +56,8 @@ typedef struct {
gpr_refcount refs;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
/** workqueue */
grpc_workqueue *workqueue;
/** load balancing policy name */
char *lb_policy_name;
@ -96,8 +98,7 @@ static void sockaddr_shutdown(grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
/* TODO(ctiller): add delayed callback */
grpc_iomgr_add_callback(r->next_completion);
grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@ -145,7 +146,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
*r->target_config = cfg;
grpc_iomgr_add_callback(r->next_completion);
grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
}
@ -154,6 +155,7 @@ static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
grpc_workqueue_unref(r->workqueue);
gpr_free(r->addrs);
gpr_free(r->addrs_len);
gpr_free(r->lb_policy_name);
@ -278,8 +280,7 @@ done:
static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_uri *uri, const char *lb_policy_name,
grpc_subchannel_factory *subchannel_factory,
grpc_resolver_args *args, const char *lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
size_t i;
int errors_found = 0; /* GPR_FALSE */
@ -287,7 +288,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice path_slice;
gpr_slice_buffer path_parts;
if (0 != strcmp(uri->authority, "")) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
return NULL;
}
@ -295,7 +296,8 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
path_slice =
gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing);
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
@ -304,7 +306,7 @@ static grpc_resolver *sockaddr_create(
r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs);
for(i = 0; i < r->num_addrs; i++) {
grpc_uri ith_uri = *uri;
grpc_uri ith_uri = *args->uri;
char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
@ -324,10 +326,12 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
r->subchannel_factory = subchannel_factory;
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->workqueue = args->workqueue;
grpc_workqueue_ref(r->workqueue);
r->lb_policy_name = gpr_strdup(lb_policy_name);
grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
}
@ -341,10 +345,8 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
#define DECL_FACTORY(name) \
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_uri *uri, \
grpc_subchannel_factory *subchannel_factory) { \
return sockaddr_create(uri, "pick_first", \
subchannel_factory, parse_##name); \
grpc_resolver_factory *factory, grpc_resolver_args *args) { \
return sockaddr_create(args, "pick_first", parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \

@ -76,6 +76,7 @@ typedef struct waiting_for_connect {
struct grpc_subchannel {
grpc_connector *connector;
grpc_workqueue *workqueue;
/** non-transport related channel filters */
const grpc_channel_filter **filters;
@ -575,7 +576,7 @@ static void publish_transport(grpc_subchannel *c) {
connectivity_state_changed_locked(c, "connected");
while ((w4c = c->waiting)) {
c->waiting = w4c->next;
grpc_iomgr_add_callback(&w4c->continuation);
grpc_workqueue_push(c->workqueue, &w4c->continuation, 1);
}
gpr_mu_unlock(&c->mu);

@ -42,6 +42,7 @@ typedef struct {
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size);
size_t read_slice_size,
grpc_workqueue *workqueue);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */

@ -59,19 +59,20 @@ static void create_sockets(int sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
size_t read_slice_size,
grpc_workqueue *workqueue) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
"socketpair-server");
p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name),
read_slice_size, "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
"socketpair-client");
p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name),
read_slice_size, "socketpair-client");
gpr_free(final_name);
return p;
}

@ -71,6 +71,9 @@ static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
if (fd->workqueue->wakeup_read_fd != fd) {
grpc_workqueue_unref(fd->workqueue);
}
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@ -158,8 +161,14 @@ void grpc_fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) {
grpc_fd *r = alloc_fd(fd);
r->workqueue = workqueue;
/* if the wakeup_read_fd is NULL, then the workqueue is under construction
==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */
if (workqueue->wakeup_read_fd != NULL) {
grpc_workqueue_ref(workqueue);
}
grpc_iomgr_register_object(&r->iomgr_object, name);
return r;
}
@ -220,7 +229,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure) {
grpc_iomgr_add_callback(fd->on_done_closure);
grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
}
} else {
wake_all_watchers_locked(fd);
@ -246,19 +255,19 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void process_callback(grpc_iomgr_closure *closure, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
grpc_workqueue *optional_workqueue) {
if (optional_workqueue == NULL) {
closure->cb(closure->cb_arg, success);
} else {
grpc_iomgr_add_delayed_callback(closure, success);
grpc_workqueue_push(optional_workqueue, closure, success);
}
}
static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
int success, int allow_synchronous_callback) {
int success, grpc_workqueue *optional_workqueue) {
size_t i;
for (i = 0; i < n; i++) {
process_callback(callbacks + i, success, allow_synchronous_callback);
process_callback(callbacks + i, success, optional_workqueue);
}
}
@ -286,7 +295,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
allow_synchronous_callback ? NULL : fd->workqueue);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@ -339,7 +348,8 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
success = !gpr_atm_acq_load(&fd->shutdown);
GPR_ASSERT(ncb <= 1);
if (ncb > 0) {
process_callbacks(closure, ncb, success, allow_synchronous_callback);
process_callbacks(closure, ncb, success,
allow_synchronous_callback ? NULL : fd->workqueue);
}
}
@ -441,7 +451,7 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure != NULL) {
grpc_iomgr_add_callback(fd->on_done_closure);
grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
}
}
gpr_mu_unlock(&fd->watcher_mu);

@ -36,6 +36,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/workqueue.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@ -57,6 +58,7 @@ struct grpc_fd {
meaning that mostly we ref by two to avoid altering the orphaned bit,
and just unref by 1 when we're ready to flag the object as orphaned */
gpr_atm refst;
grpc_workqueue *workqueue;
gpr_mu set_state_mu;
gpr_atm shutdown;
@ -103,7 +105,7 @@ struct grpc_fd {
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.

@ -57,7 +57,6 @@ void grpc_kick_poller(void) {
}
void grpc_iomgr_init(void) {
gpr_thd_id id;
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
@ -65,8 +64,6 @@ void grpc_iomgr_init(void) {
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
gpr_event_init(&g_background_callback_executor_done);
gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}
static size_t count_objects(void) {
@ -86,58 +83,36 @@ static void dump_objects(const char *kind) {
}
void grpc_iomgr_shutdown(void) {
grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
while (g_root_object.next != &g_root_object) {
if (gpr_time_cmp(
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG,
"Waiting for %d iomgr objects to be destroyed and executing "
"final callbacks",
count_objects());
} else if (g_cbs_head != NULL) {
gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
} else {
if (g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
count_objects());
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
if (g_cbs_head) {
do {
closure = g_cbs_head;
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
closure->cb(closure->cb_arg, 0);
gpr_mu_lock(&g_mu);
} while (g_cbs_head);
continue;
}
if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
continue;
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
while (g_cbs_head == NULL) {
gpr_timespec short_deadline = gpr_time_add(
gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
break;
}
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
break;
}
}
if (timeout) {
if (timeout && g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
@ -149,10 +124,6 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
grpc_kick_poller();
gpr_event_wait(&g_background_callback_executor_done,
gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_alarm_list_shutdown();
grpc_iomgr_platform_shutdown();
@ -184,67 +155,3 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
closure->next = NULL;
}
static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) {
#ifndef NDEBUG
grpc_iomgr_closure *c;
for (c = g_cbs_head; c; c = c->next) {
GPR_ASSERT(c != closure);
}
#endif
}
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
closure->success = success;
GPR_ASSERT(closure->cb);
gpr_mu_lock(&g_mu);
assert_not_scheduled_locked(closure);
closure->next = NULL;
if (!g_cbs_tail) {
g_cbs_head = g_cbs_tail = closure;
} else {
g_cbs_tail->next = closure;
g_cbs_tail = closure;
}
if (g_shutdown) {
gpr_cv_signal(&g_rcv);
}
gpr_mu_unlock(&g_mu);
}
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
}
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
grpc_iomgr_closure *closure;
for (;;) {
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
closure = g_cbs_head;
if (!closure) {
gpr_mu_unlock(&g_mu);
break;
}
g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
if (drop_mu) {
gpr_mu_unlock(drop_mu);
retake_mu = drop_mu;
drop_mu = NULL;
}
closure->cb(closure->cb_arg, success && closure->success);
n++;
}
if (retake_mu) {
gpr_mu_lock(retake_mu);
}
return n;
}

@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_iomgr_add_callback(&da->closure);
grpc_workqueue_push(fd->workqueue, &da->closure, 1);
}
}

@ -293,7 +293,7 @@ static void basic_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_iomgr_add_callback(&up_args->promotion_closure);
grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
gpr_mu_unlock(&pollset->mu);
return;
}
@ -385,7 +385,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->original_vtable = pollset->vtable;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
grpc_iomgr_add_callback(&up_args->promotion_closure);
grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);

@ -46,6 +46,7 @@
in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
void *arg, grpc_pollset_set *interested_parties,
grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline);

@ -41,21 +41,6 @@
#include <grpc/support/alloc.h>
#include "src/core/iomgr/fd_posix.h"
struct grpc_workqueue {
gpr_refcount refs;
gpr_mu mu;
grpc_iomgr_closure head;
grpc_iomgr_closure *tail;
grpc_wakeup_fd wakeup_fd;
grpc_fd *wakeup_read_fd;
grpc_iomgr_closure read_closure;
};
static void on_readable(void *arg, int success);
grpc_workqueue *grpc_workqueue_create(void) {

@ -34,4 +34,19 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
struct grpc_fd;
struct grpc_workqueue {
gpr_refcount refs;
gpr_mu mu;
grpc_iomgr_closure head;
grpc_iomgr_closure *tail;
grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd;
grpc_iomgr_closure read_closure;
};
#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */

@ -57,6 +57,7 @@ typedef struct {
gpr_refcount refs;
grpc_channel_security_connector *security_connector;
grpc_workqueue *workqueue;
grpc_iomgr_closure *notify;
grpc_connect_in_args args;
@ -71,6 +72,7 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
grpc_workqueue_unref(c->workqueue);
gpr_free(c);
}
}
@ -122,8 +124,8 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
args->addr_len, args->deadline);
grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue,
args->addr, args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
@ -165,6 +167,8 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
c->workqueue = grpc_channel_get_workqueue(f->master);
grpc_workqueue_ref(c->workqueue);
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
@ -240,7 +244,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base);
resolver = grpc_resolver_create(target, &f->base,
grpc_channel_get_workqueue(channel));
if (!resolver) {
return NULL;
}

Loading…
Cancel
Save