Core compiles with workqueues

pull/3423/head
Craig Tiller 9 years ago
parent 97419e4e7d
commit 47a708e252
  1. 5
      src/core/channel/client_channel.c
  2. 2
      src/core/client_config/connector.h
  3. 4
      src/core/client_config/lb_policies/pick_first.c
  4. 34
      src/core/client_config/resolvers/zookeeper_resolver.c
  5. 7
      src/core/client_config/subchannel.c
  6. 73
      src/core/httpcli/httpcli.c
  7. 3
      src/core/iomgr/tcp_client_posix.c
  8. 2
      src/core/iomgr/tcp_posix.c
  9. 9
      src/core/iomgr/tcp_server_posix.c
  10. 6
      src/core/iomgr/udp_server.c
  11. 6
      src/core/iomgr/workqueue_posix.c
  12. 11
      src/core/security/server_secure_chttp2.c
  13. 3
      src/core/surface/call.c
  14. 7
      src/core/surface/channel.c
  15. 3
      src/core/surface/channel.h
  16. 3
      src/core/surface/channel_connectivity.c
  17. 17
      src/core/surface/channel_create.c
  18. 3
      src/core/surface/lame_client.c
  19. 21
      src/core/surface/secure_channel_create.c
  20. 31
      src/core/surface/server.c
  21. 1
      src/core/surface/server.h
  22. 9
      src/core/surface/server_chttp2.c
  23. 4
      src/core/transport/chttp2/frame_ping.c
  24. 10
      src/core/transport/chttp2_transport.c
  25. 3
      src/core/transport/chttp2_transport.h
  26. 15
      src/core/transport/connectivity_state.c
  27. 4
      src/core/transport/connectivity_state.h
  28. 5
      test/core/bad_client/bad_client.c

@ -670,8 +670,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand); chand);
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&chand->state_tracker,
"client_channel"); grpc_channel_get_workqueue(master),
GRPC_CHANNEL_IDLE, "client_channel");
} }
/* Destructor for channel_data */ /* Destructor for channel_data */

@ -57,6 +57,8 @@ typedef struct {
const grpc_channel_args *channel_args; const grpc_channel_args *channel_args;
/** metadata context */ /** metadata context */
grpc_mdctx *metadata_context; grpc_mdctx *metadata_context;
/** workqueue */
grpc_workqueue *workqueue;
} grpc_connect_in_args; } grpc_connect_in_args;
typedef struct { typedef struct {

@ -332,8 +332,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
p->num_subchannels = args->num_subchannels; p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue; p->workqueue = args->workqueue;
grpc_workqueue_ref(p->workqueue); grpc_workqueue_ref(p->workqueue);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
"pick_first"); GRPC_CHANNEL_IDLE, "pick_first");
memcpy(p->subchannels, args->subchannels, memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels); sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);

@ -61,6 +61,8 @@ typedef struct {
grpc_subchannel_factory *subchannel_factory; grpc_subchannel_factory *subchannel_factory;
/** load balancing policy name */ /** load balancing policy name */
char *lb_policy_name; char *lb_policy_name;
/** work queue */
grpc_workqueue *workqueue;
/** mutex guarding the rest of the state */ /** mutex guarding the rest of the state */
gpr_mu mu; gpr_mu mu;
@ -108,7 +110,7 @@ static void zookeeper_shutdown(grpc_resolver *resolver) {
gpr_mu_lock(&r->mu); gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) { if (r->next_completion != NULL) {
*r->target_config = NULL; *r->target_config = NULL;
grpc_iomgr_add_callback(r->next_completion); grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL; r->next_completion = NULL;
} }
zookeeper_close(r->zookeeper_handle); zookeeper_close(r->zookeeper_handle);
@ -409,7 +411,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
if (r->resolved_config != NULL) { if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config); grpc_client_config_ref(r->resolved_config);
} }
grpc_iomgr_add_callback(r->next_completion); grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL; r->next_completion = NULL;
r->published_version = r->resolved_version; r->published_version = r->resolved_version;
} }
@ -422,19 +424,19 @@ static void zookeeper_destroy(grpc_resolver *gr) {
grpc_client_config_unref(r->resolved_config); grpc_client_config_unref(r->resolved_config);
} }
grpc_subchannel_factory_unref(r->subchannel_factory); grpc_subchannel_factory_unref(r->subchannel_factory);
grpc_workqueue_unref(r->workqueue);
gpr_free(r->name); gpr_free(r->name);
gpr_free(r->lb_policy_name); gpr_free(r->lb_policy_name);
gpr_free(r); gpr_free(r);
} }
static grpc_resolver *zookeeper_create( static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
grpc_uri *uri, const char *lb_policy_name, const char *lb_policy_name) {
grpc_subchannel_factory *subchannel_factory) {
zookeeper_resolver *r; zookeeper_resolver *r;
size_t length; size_t length;
char *path = uri->path; char *path = args->uri->path;
if (0 == strcmp(uri->authority, "")) { if (0 == strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
return NULL; return NULL;
} }
@ -452,14 +454,19 @@ static grpc_resolver *zookeeper_create(
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path); r->name = gpr_strdup(path);
r->subchannel_factory = subchannel_factory; r->workqueue = args->workqueue;
grpc_workqueue_ref(r->workqueue);
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name); r->lb_policy_name = gpr_strdup(lb_policy_name);
grpc_subchannel_factory_ref(subchannel_factory);
/** Initializes zookeeper client */ /** Initializes zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher, r->zookeeper_handle =
GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); zookeeper_init(args->uri->authority, zookeeper_global_watcher,
GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) { if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
return NULL; return NULL;
@ -490,9 +497,8 @@ static char *zookeeper_factory_get_default_hostname(
} }
static grpc_resolver *zookeeper_factory_create_resolver( static grpc_resolver *zookeeper_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri, grpc_resolver_factory *factory, grpc_resolver_args *args) {
grpc_subchannel_factory *subchannel_factory) { return zookeeper_create(args, "pick_first");
return zookeeper_create(uri, "pick_first", subchannel_factory);
} }
static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {

@ -260,6 +260,7 @@ static void subchannel_destroy(grpc_subchannel *c) {
grpc_mdctx_unref(c->mdctx); grpc_mdctx_unref(c->mdctx);
grpc_connectivity_state_destroy(&c->state_tracker); grpc_connectivity_state_destroy(&c->state_tracker);
grpc_connector_unref(c->connector); grpc_connector_unref(c->connector);
grpc_workqueue_unref(c->workqueue);
gpr_free(c); gpr_free(c);
} }
@ -296,12 +297,14 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->args = grpc_channel_args_copy(args->args); c->args = grpc_channel_args_copy(args->args);
c->mdctx = args->mdctx; c->mdctx = args->mdctx;
c->master = args->master; c->master = args->master;
c->workqueue = grpc_channel_get_workqueue(c->master);
grpc_workqueue_ref(c->workqueue);
c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed(); c->random = random_seed();
grpc_mdctx_ref(c->mdctx); grpc_mdctx_ref(c->mdctx);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&c->state_tracker, c->workqueue,
"subchannel"); GRPC_CHANNEL_IDLE, "subchannel");
gpr_mu_init(&c->mu); gpr_mu_init(&c->mu);
return c; return c;
} }

@ -65,6 +65,7 @@ typedef struct {
gpr_slice_buffer outgoing; gpr_slice_buffer outgoing;
grpc_iomgr_closure on_read; grpc_iomgr_closure on_read;
grpc_iomgr_closure done_write; grpc_iomgr_closure done_write;
grpc_workqueue *workqueue;
} internal_request; } internal_request;
static grpc_httpcli_get_override g_get_override = NULL; static grpc_httpcli_get_override g_get_override = NULL;
@ -105,6 +106,7 @@ static void finish(internal_request *req, int success) {
grpc_iomgr_unregister_object(&req->iomgr_obj); grpc_iomgr_unregister_object(&req->iomgr_obj);
gpr_slice_buffer_destroy(&req->incoming); gpr_slice_buffer_destroy(&req->incoming);
gpr_slice_buffer_destroy(&req->outgoing); gpr_slice_buffer_destroy(&req->outgoing);
grpc_workqueue_unref(req->workqueue);
gpr_free(req); gpr_free(req);
} }
@ -202,8 +204,8 @@ static void next_address(internal_request *req) {
} }
addr = &req->addresses->addrs[req->next_address++]; addr = &req->addresses->addrs[req->next_address++];
grpc_tcp_client_connect(on_connected, req, &req->context->pollset_set, grpc_tcp_client_connect(on_connected, req, &req->context->pollset_set,
(struct sockaddr *)&addr->addr, addr->len, req->workqueue, (struct sockaddr *)&addr->addr,
req->deadline); addr->len, req->deadline);
} }
static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
@ -217,19 +219,16 @@ static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
next_address(req); next_address(req);
} }
void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, static void internal_request_begin(grpc_httpcli_context *context,
const grpc_httpcli_request *request, grpc_pollset *pollset,
gpr_timespec deadline, const grpc_httpcli_request *request,
grpc_httpcli_response_cb on_response, void *user_data) { gpr_timespec deadline,
internal_request *req; grpc_httpcli_response_cb on_response,
char *name; void *user_data, const char *name,
if (g_get_override && gpr_slice request_text) {
g_get_override(request, deadline, on_response, user_data)) { internal_request *req = gpr_malloc(sizeof(internal_request));
return;
}
req = gpr_malloc(sizeof(internal_request));
memset(req, 0, sizeof(*req)); memset(req, 0, sizeof(*req));
req->request_text = grpc_httpcli_format_get_request(request); req->request_text = request_text;
grpc_httpcli_parser_init(&req->parser); grpc_httpcli_parser_init(&req->parser);
req->on_response = on_response; req->on_response = on_response;
req->user_data = user_data; req->user_data = user_data;
@ -242,51 +241,47 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
grpc_iomgr_closure_init(&req->done_write, done_write, req); grpc_iomgr_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming); gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing); gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name); grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
req->host = gpr_strdup(request->host); req->host = gpr_strdup(request->host);
req->workqueue = grpc_workqueue_create();
grpc_workqueue_add_to_pollset(req->workqueue, pollset);
grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset); grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset);
grpc_resolve_address(request->host, req->handshaker->default_port, grpc_resolve_address(request->host, req->handshaker->default_port,
on_resolved, req); on_resolved, req);
} }
void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data) {
char *name;
if (g_get_override &&
g_get_override(request, deadline, on_response, user_data)) {
return;
}
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
internal_request_begin(context, pollset, request, deadline, on_response,
user_data, name,
grpc_httpcli_format_get_request(request));
gpr_free(name);
}
void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
const grpc_httpcli_request *request, const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size, const char *body_bytes, size_t body_size,
gpr_timespec deadline, gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data) { grpc_httpcli_response_cb on_response, void *user_data) {
internal_request *req;
char *name; char *name;
if (g_post_override && g_post_override(request, body_bytes, body_size, if (g_post_override && g_post_override(request, body_bytes, body_size,
deadline, on_response, user_data)) { deadline, on_response, user_data)) {
return; return;
} }
req = gpr_malloc(sizeof(internal_request));
memset(req, 0, sizeof(*req));
req->request_text =
grpc_httpcli_format_post_request(request, body_bytes, body_size);
grpc_httpcli_parser_init(&req->parser);
req->on_response = on_response;
req->user_data = user_data;
req->deadline = deadline;
req->handshaker =
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
grpc_iomgr_closure_init(&req->on_read, on_read, req);
grpc_iomgr_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name); internal_request_begin(
context, pollset, request, deadline, on_response, user_data, name,
grpc_httpcli_format_post_request(request, body_bytes, body_size));
gpr_free(name); gpr_free(name);
req->host = gpr_strdup(request->host);
grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset);
grpc_resolve_address(request->host, req->handshaker->default_port,
on_resolved, req);
} }
void grpc_httpcli_set_override(grpc_httpcli_get_override get, void grpc_httpcli_set_override(grpc_httpcli_get_override get,

@ -195,6 +195,7 @@ finish:
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
void *arg, grpc_pollset_set *interested_parties, void *arg, grpc_pollset_set *interested_parties,
grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len, const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) { gpr_timespec deadline) {
int fd; int fd;
@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
addr_str = grpc_sockaddr_to_uri(addr); addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str); gpr_asprintf(&name, "tcp-client:%s", addr_str);
fdobj = grpc_fd_create(fd, name); fdobj = grpc_fd_create(fd, workqueue, name);
if (err >= 0) { if (err >= 0) {
cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));

@ -261,7 +261,7 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
tcp->finished_edge = 0; tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else { } else {
grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1); grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1);
} }
/* TODO(ctiller): immediate return */ /* TODO(ctiller): immediate return */
return GRPC_ENDPOINT_PENDING; return GRPC_ENDPOINT_PENDING;

@ -124,6 +124,9 @@ struct grpc_tcp_server {
grpc_pollset **pollsets; grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */ /* number of pollsets in the pollsets array */
size_t pollset_count; size_t pollset_count;
/** workqueue for interally created async work */
grpc_workqueue *workqueue;
}; };
grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *grpc_tcp_server_create(void) {
@ -137,6 +140,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0; s->nports = 0;
s->port_capacity = INIT_PORT_CAP; s->port_capacity = INIT_PORT_CAP;
s->workqueue = grpc_workqueue_create();
return s; return s;
} }
@ -147,6 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu); gpr_mu_destroy(&s->mu);
gpr_free(s->ports); gpr_free(s->ports);
grpc_workqueue_unref(s->workqueue);
gpr_free(s); gpr_free(s);
} }
@ -339,7 +344,7 @@ static void on_read(void *arg, int success) {
addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
fdobj = grpc_fd_create(fd, name); fdobj = grpc_fd_create(fd, sp->server->workqueue, name);
/* TODO(ctiller): revise this when we have server-side sharding /* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */ incoming channel to every pollset owned by the server */
@ -387,7 +392,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++]; sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->fd = fd; sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name); sp->emfd = grpc_fd_create(fd, s->workqueue, name);
memcpy(sp->addr.untyped, addr, addr_len); memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len; sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd); GPR_ASSERT(sp->emfd);

@ -121,6 +121,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets; grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */ /* number of pollsets in the pollsets array */
size_t pollset_count; size_t pollset_count;
grpc_workqueue *workqueue;
}; };
grpc_udp_server *grpc_udp_server_create(void) { grpc_udp_server *grpc_udp_server_create(void) {
@ -135,6 +137,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0; s->nports = 0;
s->port_capacity = INIT_PORT_CAP; s->port_capacity = INIT_PORT_CAP;
s->workqueue = grpc_workqueue_create();
return s; return s;
} }
@ -146,6 +149,7 @@ static void finish_shutdown(grpc_udp_server *s) {
gpr_cv_destroy(&s->cv); gpr_cv_destroy(&s->cv);
gpr_free(s->ports); gpr_free(s->ports);
grpc_workqueue_unref(s->workqueue);
gpr_free(s); gpr_free(s);
} }
@ -310,7 +314,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp = &s->ports[s->nports++]; sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->fd = fd; sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name); sp->emfd = grpc_fd_create(fd, s->workqueue, name);
memcpy(sp->addr.untyped, addr, addr_len); memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len; sp->addr_len = addr_len;
sp->read_cb = read_cb; sp->read_cb = read_cb;

@ -35,6 +35,7 @@
#ifdef GPR_POSIX_SOCKET #ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/workqueue.h" #include "src/core/iomgr/workqueue.h"
#include <stdio.h> #include <stdio.h>
@ -52,8 +53,9 @@ grpc_workqueue *grpc_workqueue_create(void) {
workqueue->tail = &workqueue->head; workqueue->tail = &workqueue->head;
grpc_wakeup_fd_init(&workqueue->wakeup_fd); grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue); sprintf(name, "workqueue:%p", (void *)workqueue);
workqueue->wakeup_read_fd = workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); workqueue->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue); grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
return workqueue; return workqueue;

@ -85,7 +85,7 @@ static void state_unref(grpc_server_secure_state *state) {
} }
static void setup_transport(void *statep, grpc_transport *transport, static void setup_transport(void *statep, grpc_transport *transport,
grpc_mdctx *mdctx) { grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
static grpc_channel_filter const *extra_filters[] = { static grpc_channel_filter const *extra_filters[] = {
&grpc_server_auth_filter, &grpc_http_server_filter}; &grpc_server_auth_filter, &grpc_http_server_filter};
grpc_server_secure_state *state = statep; grpc_server_secure_state *state = statep;
@ -98,7 +98,8 @@ static void setup_transport(void *statep, grpc_transport *transport,
grpc_server_get_channel_args(state->server), args_to_add, grpc_server_get_channel_args(state->server), args_to_add,
GPR_ARRAY_SIZE(args_to_add)); GPR_ARRAY_SIZE(args_to_add));
grpc_server_setup_transport(state->server, transport, extra_filters, grpc_server_setup_transport(state->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy); GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
args_copy);
grpc_channel_args_destroy(args_copy); grpc_channel_args_destroy(args_copy);
} }
@ -130,15 +131,17 @@ static void on_secure_transport_setup_done(void *statep,
grpc_server_secure_state *state = statep; grpc_server_secure_state *state = statep;
grpc_transport *transport; grpc_transport *transport;
grpc_mdctx *mdctx; grpc_mdctx *mdctx;
grpc_workqueue *workqueue;
if (status == GRPC_SECURITY_OK) { if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu); gpr_mu_lock(&state->mu);
remove_tcp_from_list_locked(state, wrapped_endpoint); remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) { if (!state->is_shutdown) {
mdctx = grpc_mdctx_create(); mdctx = grpc_mdctx_create();
workqueue = grpc_workqueue_create();
transport = grpc_create_chttp2_transport( transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, grpc_server_get_channel_args(state->server), secure_endpoint, mdctx,
0); workqueue, 0);
setup_transport(state, transport, mdctx); setup_transport(state, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0); grpc_chttp2_transport_start_reading(transport, NULL, 0);
} else { } else {
/* We need to consume this here, because the server may already have gone /* We need to consume this here, because the server may already have gone

@ -499,7 +499,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
} else { } else {
c->destroy_closure.cb = destroy_call; c->destroy_closure.cb = destroy_call;
c->destroy_closure.cb_arg = c; c->destroy_closure.cb_arg = c;
grpc_iomgr_add_callback(&c->destroy_closure); grpc_workqueue_push(grpc_channel_get_workqueue(c->channel),
&c->destroy_closure, 1);
} }
} }
} }

@ -79,6 +79,7 @@ struct grpc_channel {
registered_call *registered_calls; registered_call *registered_calls;
grpc_iomgr_closure destroy_closure; grpc_iomgr_closure destroy_closure;
char *target; char *target;
grpc_workqueue *workqueue;
}; };
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@ -92,7 +93,8 @@ struct grpc_channel {
grpc_channel *grpc_channel_create_from_filters( grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t num_filters, const char *target, const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
int is_client) {
size_t i; size_t i;
size_t size = size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@ -104,6 +106,7 @@ grpc_channel *grpc_channel_create_from_filters(
/* decremented by grpc_channel_destroy */ /* decremented by grpc_channel_destroy */
gpr_ref_init(&channel->refs, 1); gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx; channel->metadata_context = mdctx;
channel->workqueue = workqueue;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0); channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
channel->grpc_compression_algorithm_string = channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
@ -311,7 +314,7 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) { if (gpr_unref(&channel->refs)) {
channel->destroy_closure.cb = destroy_channel; channel->destroy_closure.cb = destroy_channel;
channel->destroy_closure.cb_arg = channel; channel->destroy_closure.cb_arg = channel;
grpc_iomgr_add_callback(&channel->destroy_closure); grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
} }
} }

@ -40,7 +40,8 @@
grpc_channel *grpc_channel_create_from_filters( grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t count, const char *target, const grpc_channel_filter **filters, size_t count,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
int is_client);
/** Get a (borrowed) pointer to this channels underlying channel stack */ /** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);

@ -176,7 +176,8 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is " "grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'", "not a client channel, but '%s'",
client_channel_elem->filter->name); client_channel_elem->filter->name);
grpc_iomgr_add_delayed_callback(&w->on_complete, 1); grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
1);
} else { } else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem, grpc_client_channel_add_interested_party(client_channel_elem,

@ -74,7 +74,8 @@ static void connected(void *arg, grpc_endpoint *tcp) {
grpc_iomgr_closure *notify; grpc_iomgr_closure *notify;
if (tcp != NULL) { if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport( c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, tcp, c->args.metadata_context, 1); c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
GPR_ASSERT(c->result->transport); GPR_ASSERT(c->result->transport);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
@ -85,7 +86,7 @@ static void connected(void *arg, grpc_endpoint *tcp) {
} }
notify = c->notify; notify = c->notify;
c->notify = NULL; c->notify = NULL;
grpc_iomgr_add_callback(notify); notify->cb(notify->cb_arg, 1);
} }
static void connector_connect(grpc_connector *con, static void connector_connect(grpc_connector *con,
@ -98,8 +99,9 @@ static void connector_connect(grpc_connector *con,
c->notify = notify; c->notify = notify;
c->args = *args; c->args = *args;
c->result = result; c->result = result;
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, grpc_tcp_client_connect(connected, c, args->interested_parties,
args->addr_len, args->deadline); args->workqueue, args->addr, args->addr_len,
args->deadline);
} }
static const grpc_connector_vtable connector_vtable = { static const grpc_connector_vtable connector_vtable = {
@ -164,6 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_resolver *resolver; grpc_resolver *resolver;
subchannel_factory *f; subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_workqueue *workqueue = grpc_workqueue_create();
size_t n = 0; size_t n = 0;
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
if (grpc_channel_args_is_census_enabled(args)) { if (grpc_channel_args_is_census_enabled(args)) {
@ -173,8 +176,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter; filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS); GPR_ASSERT(n <= MAX_FILTERS);
channel = channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); workqueue, 1);
f = gpr_malloc(sizeof(*f)); f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable; f->base.vtable = &subchannel_factory_vtable;
@ -184,7 +187,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
f->merge_args = grpc_channel_args_copy(args); f->merge_args = grpc_channel_args_copy(args);
f->master = channel; f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base); resolver = grpc_resolver_create(target, &f->base, workqueue);
if (!resolver) { if (!resolver) {
return NULL; return NULL;
} }

@ -150,7 +150,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
channel_data *chand; channel_data *chand;
static const grpc_channel_filter *filters[] = {&lame_filter}; static const grpc_channel_filter *filters[] = {&lame_filter};
channel = grpc_channel_create_from_filters(target, filters, 1, NULL, channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
grpc_mdctx_create(), 1); grpc_mdctx_create(),
grpc_workqueue_create(), 1);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GPR_ASSERT(elem->filter == &lame_filter); GPR_ASSERT(elem->filter == &lame_filter);
chand = (channel_data *)elem->channel_data; chand = (channel_data *)elem->channel_data;

@ -57,7 +57,6 @@ typedef struct {
gpr_refcount refs; gpr_refcount refs;
grpc_channel_security_connector *security_connector; grpc_channel_security_connector *security_connector;
grpc_workqueue *workqueue;
grpc_iomgr_closure *notify; grpc_iomgr_closure *notify;
grpc_connect_in_args args; grpc_connect_in_args args;
@ -72,7 +71,6 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_connector *con) { static void connector_unref(grpc_connector *con) {
connector *c = (connector *)con; connector *c = (connector *)con;
if (gpr_unref(&c->refs)) { if (gpr_unref(&c->refs)) {
grpc_workqueue_unref(c->workqueue);
gpr_free(c); gpr_free(c);
} }
} }
@ -88,7 +86,8 @@ static void on_secure_transport_setup_done(void *arg,
memset(c->result, 0, sizeof(*c->result)); memset(c->result, 0, sizeof(*c->result));
} else { } else {
c->result->transport = grpc_create_chttp2_transport( c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); c->args.channel_args, secure_endpoint, c->args.metadata_context,
c->args.workqueue, 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
c->result->filters[0] = &grpc_http_client_filter; c->result->filters[0] = &grpc_http_client_filter;
@ -124,8 +123,9 @@ static void connector_connect(grpc_connector *con,
c->notify = notify; c->notify = notify;
c->args = *args; c->args = *args;
c->result = result; c->result = result;
grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue, grpc_tcp_client_connect(connected, c, args->interested_parties,
args->addr, args->addr_len, args->deadline); args->workqueue, args->addr, args->addr_len,
args->deadline);
} }
static const grpc_connector_vtable connector_vtable = { static const grpc_connector_vtable connector_vtable = {
@ -167,8 +167,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c)); memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable; c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector; c->security_connector = f->security_connector;
c->workqueue = grpc_channel_get_workqueue(f->master);
grpc_workqueue_ref(c->workqueue);
gpr_ref_init(&c->refs, 1); gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx; args->mdctx = f->mdctx;
args->args = final_args; args->args = final_args;
@ -197,6 +195,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_channel_args *new_args_from_connector; grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *connector; grpc_channel_security_connector *connector;
grpc_mdctx *mdctx; grpc_mdctx *mdctx;
grpc_workqueue *workqueue;
grpc_resolver *resolver; grpc_resolver *resolver;
subchannel_factory *f; subchannel_factory *f;
#define MAX_FILTERS 3 #define MAX_FILTERS 3
@ -219,6 +218,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
"Failed to create security connector."); "Failed to create security connector.");
} }
mdctx = grpc_mdctx_create(); mdctx = grpc_mdctx_create();
workqueue = grpc_workqueue_create();
connector_arg = grpc_security_connector_to_arg(&connector->base); connector_arg = grpc_security_connector_to_arg(&connector->base);
args_copy = grpc_channel_args_copy_and_add( args_copy = grpc_channel_args_copy_and_add(
@ -231,8 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
filters[n++] = &grpc_client_channel_filter; filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS); GPR_ASSERT(n <= MAX_FILTERS);
channel = channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); mdctx, workqueue, 1);
f = gpr_malloc(sizeof(*f)); f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable; f->base.vtable = &subchannel_factory_vtable;
@ -244,8 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
f->merge_args = grpc_channel_args_copy(args_copy); f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel; f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base, resolver = grpc_resolver_create(target, &f->base, workqueue);
grpc_channel_get_workqueue(channel));
if (!resolver) { if (!resolver) {
return NULL; return NULL;
} }

@ -219,6 +219,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */ /** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time; gpr_timespec last_shutdown_message_time;
grpc_workqueue *workqueue;
}; };
#define SERVER_FROM_CALL_ELEM(elem) \ #define SERVER_FROM_CALL_ELEM(elem) \
@ -314,7 +316,7 @@ static void kill_zombie(void *elem, int success) {
} }
static void request_matcher_zombify_all_pending_calls( static void request_matcher_zombify_all_pending_calls(
request_matcher *request_matcher) { request_matcher *request_matcher, grpc_workqueue *workqueue) {
while (request_matcher->pending_head) { while (request_matcher->pending_head) {
call_data *calld = request_matcher->pending_head; call_data *calld = request_matcher->pending_head;
request_matcher->pending_head = calld->pending_next; request_matcher->pending_head = calld->pending_next;
@ -324,7 +326,7 @@ static void request_matcher_zombify_all_pending_calls(
grpc_iomgr_closure_init( grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie, &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
} }
} }
@ -363,6 +365,7 @@ static void server_delete(grpc_server *server) {
} }
request_matcher_destroy(&server->unregistered_request_matcher); request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist); gpr_stack_lockfree_destroy(server->request_freelist);
grpc_workqueue_unref(server->workqueue);
gpr_free(server->cqs); gpr_free(server->cqs);
gpr_free(server->pollsets); gpr_free(server->pollsets);
gpr_free(server->shutdown_tags); gpr_free(server->shutdown_tags);
@ -401,7 +404,8 @@ static void destroy_channel(channel_data *chand) {
maybe_finish_shutdown(chand->server); maybe_finish_shutdown(chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand; chand->finish_destroy_channel_closure.cb_arg = chand;
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); grpc_workqueue_push(chand->server->workqueue,
&chand->finish_destroy_channel_closure, 1);
} }
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
@ -414,7 +418,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
return; return;
} }
@ -505,10 +509,11 @@ static void kill_pending_work_locked(grpc_server *server) {
registered_method *rm; registered_method *rm;
request_matcher_kill_requests(server, &server->unregistered_request_matcher); request_matcher_kill_requests(server, &server->unregistered_request_matcher);
request_matcher_zombify_all_pending_calls( request_matcher_zombify_all_pending_calls(
&server->unregistered_request_matcher); &server->unregistered_request_matcher, server->workqueue);
for (rm = server->registered_methods; rm; rm = rm->next) { for (rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_kill_requests(server, &rm->request_matcher); request_matcher_kill_requests(server, &rm->request_matcher);
request_matcher_zombify_all_pending_calls(&rm->request_matcher); request_matcher_zombify_all_pending_calls(&rm->request_matcher,
server->workqueue);
} }
} }
@ -561,6 +566,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
static void server_on_recv(void *ptr, int success) { static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr; grpc_call_element *elem = ptr;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_timespec op_deadline; gpr_timespec op_deadline;
if (success && !calld->got_initial_metadata) { if (success && !calld->got_initial_metadata) {
@ -595,7 +601,8 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_workqueue_push(chand->server->workqueue,
&calld->kill_zombie_closure, 1);
} else { } else {
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
} }
@ -606,7 +613,8 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_workqueue_push(chand->server->workqueue,
&calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) { } else if (calld->state == PENDING) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
@ -799,6 +807,7 @@ grpc_server *grpc_server_create_from_filters(
gpr_ref_init(&server->internal_refcount, 1); gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev = server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data; &server->root_channel_data;
server->workqueue = grpc_workqueue_create();
/* TODO(ctiller): expose a channel_arg for this */ /* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768; server->max_requested_calls = 32768;
@ -873,6 +882,7 @@ void grpc_server_start(grpc_server *server) {
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
} }
for (l = server->listeners; l; l = l->next) { for (l = server->listeners; l; l = l->next) {
@ -883,6 +893,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters, grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx, size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_workqueue *workqueue,
const grpc_channel_args *args) { const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1; size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters = grpc_channel_filter const **filters =
@ -917,7 +928,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
} }
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
mdctx, 0); mdctx, workqueue, 0);
chand = (channel_data *)grpc_channel_stack_element( chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0) grpc_channel_get_channel_stack(channel), 0)
->channel_data; ->channel_data;
@ -1119,7 +1130,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_iomgr_closure_init( grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie, &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
} else { } else {
GPR_ASSERT(calld->state == PENDING); GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED; calld->state = ACTIVATED;

@ -58,6 +58,7 @@ void grpc_server_listener_destroy_done(void *server);
void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters, grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx, size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_workqueue *workqueue,
const grpc_channel_args *args); const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);

@ -43,11 +43,11 @@
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
static void setup_transport(void *server, grpc_transport *transport, static void setup_transport(void *server, grpc_transport *transport,
grpc_mdctx *mdctx) { grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
static grpc_channel_filter const *extra_filters[] = { static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter}; &grpc_http_server_filter};
grpc_server_setup_transport(server, transport, extra_filters, grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
grpc_server_get_channel_args(server)); grpc_server_get_channel_args(server));
} }
@ -60,9 +60,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* case. * case.
*/ */
grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_workqueue *workqueue = grpc_workqueue_create();
grpc_transport *transport = grpc_create_chttp2_transport( grpc_transport *transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(server), tcp, mdctx, 0); grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0);
setup_transport(server, transport, mdctx); setup_transport(server, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0); grpc_chttp2_transport_start_reading(transport, NULL, 0);
} }

@ -89,7 +89,9 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
for (ping = transport_parsing->pings.next; for (ping = transport_parsing->pings.next;
ping != &transport_parsing->pings; ping = ping->next) { ping != &transport_parsing->pings; ping = ping->next) {
if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
grpc_iomgr_add_delayed_callback(ping->on_recv, 1); /* we know no locks are held here, we may as well just call up
* directly */
ping->on_recv->cb(ping->on_recv->cb_arg, 1);
} }
ping->next->prev = ping->prev; ping->next->prev = ping->prev;
ping->prev->next = ping->next; ping->prev->next = ping->next;

@ -166,7 +166,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
and maybe they hold resources that need to be freed */ and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) { while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next; grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
grpc_iomgr_add_delayed_callback(ping->on_recv, 0); ping->on_recv->cb(ping->on_recv->cb_arg, 0);
ping->next->prev = ping->prev; ping->next->prev = ping->prev;
ping->prev->next = ping->next; ping->prev->next = ping->next;
gpr_free(ping); gpr_free(ping);
@ -209,7 +209,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_chttp2_transport *t, static void init_transport(grpc_chttp2_transport *t,
const grpc_channel_args *channel_args, const grpc_channel_args *channel_args,
grpc_endpoint *ep, grpc_mdctx *mdctx, grpc_endpoint *ep, grpc_mdctx *mdctx,
gpr_uint8 is_client) { grpc_workqueue *workqueue, gpr_uint8 is_client) {
size_t i; size_t i;
int j; int j;
@ -242,7 +242,7 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.deframe_state = t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client; t->writing.is_client = is_client;
grpc_connectivity_state_init(&t->channel_callback.state_tracker, grpc_connectivity_state_init(&t->channel_callback.state_tracker, workqueue,
GRPC_CHANNEL_READY, "transport"); GRPC_CHANNEL_READY, "transport");
gpr_slice_buffer_init(&t->global.qbuf); gpr_slice_buffer_init(&t->global.qbuf);
@ -1280,9 +1280,9 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
grpc_transport *grpc_create_chttp2_transport( grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx,
int is_client) { grpc_workqueue *workqueue, int is_client) {
grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
init_transport(t, channel_args, ep, mdctx, is_client != 0); init_transport(t, channel_args, ep, mdctx, workqueue, is_client != 0);
return &t->base; return &t->base;
} }

@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H
#include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/workqueue.h"
#include "src/core/transport/transport.h" #include "src/core/transport/transport.h"
extern int grpc_http_trace; extern int grpc_http_trace;
@ -42,7 +43,7 @@ extern int grpc_flowctl_trace;
grpc_transport *grpc_create_chttp2_transport( grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep, const grpc_channel_args *channel_args, grpc_endpoint *ep,
grpc_mdctx *metadata_context, int is_client); grpc_mdctx *metadata_context, grpc_workqueue *workqueue, int is_client);
void grpc_chttp2_transport_start_reading(grpc_transport *transport, void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices); gpr_slice *slices, size_t nslices);

@ -56,6 +56,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
} }
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
grpc_workqueue *workqueue,
grpc_connectivity_state init_state, grpc_connectivity_state init_state,
const char *name) { const char *name) {
tracker->current_state = init_state; tracker->current_state = init_state;
@ -64,16 +65,18 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
} }
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
int success;
grpc_connectivity_state_watcher *w; grpc_connectivity_state_watcher *w;
while ((w = tracker->watchers)) { while ((w = tracker->watchers)) {
tracker->watchers = w->next; tracker->watchers = w->next;
if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
*w->current = GRPC_CHANNEL_FATAL_FAILURE; *w->current = GRPC_CHANNEL_FATAL_FAILURE;
grpc_iomgr_add_callback(w->notify); success = 1;
} else { } else {
grpc_iomgr_add_delayed_callback(w->notify, 0); success = 0;
} }
grpc_workqueue_push(tracker->workqueue, w->notify, success);
gpr_free(w); gpr_free(w);
} }
gpr_free(tracker->name); gpr_free(tracker->name);
@ -94,7 +97,7 @@ int grpc_connectivity_state_notify_on_state_change(
} }
if (tracker->current_state != *current) { if (tracker->current_state != *current) {
*current = tracker->current_state; *current = tracker->current_state;
grpc_iomgr_add_callback(notify); grpc_workqueue_push(tracker->workqueue, notify, 1);
} else { } else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current; w->current = current;
@ -136,13 +139,13 @@ void grpc_connectivity_state_set_with_scheduler(
tracker->watchers = new; tracker->watchers = new;
} }
static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) { static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) {
grpc_iomgr_add_callback(closure); grpc_workqueue_push(workqueue, closure, 1);
} }
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state, grpc_connectivity_state state,
const char *reason) { const char *reason) {
grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
NULL, reason); tracker->workqueue, reason);
} }

@ -36,6 +36,7 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/workqueue.h"
typedef struct grpc_connectivity_state_watcher { typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */ /** we keep watchers in a linked list */
@ -53,11 +54,14 @@ typedef struct {
grpc_connectivity_state_watcher *watchers; grpc_connectivity_state_watcher *watchers;
/** a name to help debugging */ /** a name to help debugging */
char *name; char *name;
/** workqueue for async work */
grpc_workqueue *workqueue;
} grpc_connectivity_state_tracker; } grpc_connectivity_state_tracker;
extern int grpc_connectivity_state_trace; extern int grpc_connectivity_state_trace;
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
grpc_workqueue *grpc_workqueue,
grpc_connectivity_state init_state, grpc_connectivity_state init_state,
const char *name); const char *name);
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);

@ -65,12 +65,13 @@ static void done_write(void *arg, int success) {
} }
static void server_setup_transport(void *ts, grpc_transport *transport, static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_mdctx *mdctx) { grpc_mdctx *mdctx,
grpc_workqueue *workqueue) {
thd_args *a = ts; thd_args *a = ts;
static grpc_channel_filter const *extra_filters[] = { static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter}; &grpc_http_server_filter};
grpc_server_setup_transport(a->server, transport, extra_filters, grpc_server_setup_transport(a->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
grpc_server_get_channel_args(a->server)); grpc_server_get_channel_args(a->server));
} }

Loading…
Cancel
Save