diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 2624fcdd538..6fefdec2f64 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -670,8 +670,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_channel"); + grpc_connectivity_state_init(&chand->state_tracker, + grpc_channel_get_workqueue(master), + GRPC_CHANNEL_IDLE, "client_channel"); } /* Destructor for channel_data */ diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index 39f34679901..bdaeda86aee 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -57,6 +57,8 @@ typedef struct { const grpc_channel_args *channel_args; /** metadata context */ grpc_mdctx *metadata_context; + /** workqueue */ + grpc_workqueue *workqueue; } grpc_connect_in_args; typedef struct { diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 151b6f12f88..06186403e53 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -332,8 +332,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, p->num_subchannels = args->num_subchannels; p->workqueue = args->workqueue; grpc_workqueue_ref(p->workqueue); - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "pick_first"); + grpc_connectivity_state_init(&p->state_tracker, args->workqueue, + GRPC_CHANNEL_IDLE, "pick_first"); memcpy(p->subchannels, args->subchannels, sizeof(grpc_subchannel *) * args->num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 2594e6fae95..bc042037448 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -61,6 +61,8 @@ typedef struct { grpc_subchannel_factory *subchannel_factory; /** load balancing policy name */ char *lb_policy_name; + /** work queue */ + grpc_workqueue *workqueue; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -108,7 +110,7 @@ static void zookeeper_shutdown(grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } zookeeper_close(r->zookeeper_handle); @@ -409,7 +411,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { if (r->resolved_config != NULL) { grpc_client_config_ref(r->resolved_config); } - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; r->published_version = r->resolved_version; } @@ -422,19 +424,19 @@ static void zookeeper_destroy(grpc_resolver *gr) { grpc_client_config_unref(r->resolved_config); } grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_workqueue_unref(r->workqueue); gpr_free(r->name); gpr_free(r->lb_policy_name); gpr_free(r); } -static grpc_resolver *zookeeper_create( - grpc_uri *uri, const char *lb_policy_name, - grpc_subchannel_factory *subchannel_factory) { +static grpc_resolver *zookeeper_create(grpc_resolver_args *args, + const char *lb_policy_name) { zookeeper_resolver *r; size_t length; - char *path = uri->path; + char *path = args->uri->path; - if (0 == strcmp(uri->authority, "")) { + if (0 == strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); return NULL; } @@ -452,14 +454,19 @@ static grpc_resolver *zookeeper_create( grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); r->name = gpr_strdup(path); - r->subchannel_factory = subchannel_factory; + r->workqueue = args->workqueue; + grpc_workqueue_ref(r->workqueue); + + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + r->lb_policy_name = gpr_strdup(lb_policy_name); - grpc_subchannel_factory_ref(subchannel_factory); /** Initializes zookeeper client */ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher, - GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); + r->zookeeper_handle = + zookeeper_init(args->uri->authority, zookeeper_global_watcher, + GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); if (r->zookeeper_handle == NULL) { gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); return NULL; @@ -490,9 +497,8 @@ static char *zookeeper_factory_get_default_hostname( } static grpc_resolver *zookeeper_factory_create_resolver( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory) { - return zookeeper_create(uri, "pick_first", subchannel_factory); + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return zookeeper_create(args, "pick_first"); } static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 0718ffbb8c9..82212d2c6b4 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -260,6 +260,7 @@ static void subchannel_destroy(grpc_subchannel *c) { grpc_mdctx_unref(c->mdctx); grpc_connectivity_state_destroy(&c->state_tracker); grpc_connector_unref(c->connector); + grpc_workqueue_unref(c->workqueue); gpr_free(c); } @@ -296,12 +297,14 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->args = grpc_channel_args_copy(args->args); c->mdctx = args->mdctx; c->master = args->master; + c->workqueue = grpc_channel_get_workqueue(c->master); + grpc_workqueue_ref(c->workqueue); c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_mdctx_ref(c->mdctx); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); - grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, - "subchannel"); + grpc_connectivity_state_init(&c->state_tracker, c->workqueue, + GRPC_CHANNEL_IDLE, "subchannel"); gpr_mu_init(&c->mu); return c; } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 1e38479eb16..4bfe3cf9738 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -65,6 +65,7 @@ typedef struct { gpr_slice_buffer outgoing; grpc_iomgr_closure on_read; grpc_iomgr_closure done_write; + grpc_workqueue *workqueue; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -105,6 +106,7 @@ static void finish(internal_request *req, int success) { grpc_iomgr_unregister_object(&req->iomgr_obj); gpr_slice_buffer_destroy(&req->incoming); gpr_slice_buffer_destroy(&req->outgoing); + grpc_workqueue_unref(req->workqueue); gpr_free(req); } @@ -202,8 +204,8 @@ static void next_address(internal_request *req) { } addr = &req->addresses->addrs[req->next_address++]; grpc_tcp_client_connect(on_connected, req, &req->context->pollset_set, - (struct sockaddr *)&addr->addr, addr->len, - req->deadline); + req->workqueue, (struct sockaddr *)&addr->addr, + addr->len, req->deadline); } static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { @@ -217,19 +219,16 @@ static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { next_address(req); } -void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, - const grpc_httpcli_request *request, - gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { - internal_request *req; - char *name; - if (g_get_override && - g_get_override(request, deadline, on_response, user_data)) { - return; - } - req = gpr_malloc(sizeof(internal_request)); +static void internal_request_begin(grpc_httpcli_context *context, + grpc_pollset *pollset, + const grpc_httpcli_request *request, + gpr_timespec deadline, + grpc_httpcli_response_cb on_response, + void *user_data, const char *name, + gpr_slice request_text) { + internal_request *req = gpr_malloc(sizeof(internal_request)); memset(req, 0, sizeof(*req)); - req->request_text = grpc_httpcli_format_get_request(request); + req->request_text = request_text; grpc_httpcli_parser_init(&req->parser); req->on_response = on_response; req->user_data = user_data; @@ -242,51 +241,47 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, grpc_iomgr_closure_init(&req->done_write, done_write, req); gpr_slice_buffer_init(&req->incoming); gpr_slice_buffer_init(&req->outgoing); - gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); - gpr_free(name); req->host = gpr_strdup(request->host); + req->workqueue = grpc_workqueue_create(); + grpc_workqueue_add_to_pollset(req->workqueue, pollset); grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); } +void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, + const grpc_httpcli_request *request, + gpr_timespec deadline, + grpc_httpcli_response_cb on_response, void *user_data) { + char *name; + if (g_get_override && + g_get_override(request, deadline, on_response, user_data)) { + return; + } + gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + internal_request_begin(context, pollset, request, deadline, on_response, + user_data, name, + grpc_httpcli_format_get_request(request)); + gpr_free(name); +} + void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { - internal_request *req; char *name; if (g_post_override && g_post_override(request, body_bytes, body_size, deadline, on_response, user_data)) { return; } - req = gpr_malloc(sizeof(internal_request)); - memset(req, 0, sizeof(*req)); - req->request_text = - grpc_httpcli_format_post_request(request, body_bytes, body_size); - grpc_httpcli_parser_init(&req->parser); - req->on_response = on_response; - req->user_data = user_data; - req->deadline = deadline; - req->handshaker = - request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; - req->context = context; - req->pollset = pollset; - grpc_iomgr_closure_init(&req->on_read, on_read, req); - grpc_iomgr_closure_init(&req->done_write, done_write, req); - gpr_slice_buffer_init(&req->incoming); - gpr_slice_buffer_init(&req->outgoing); gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); - grpc_iomgr_register_object(&req->iomgr_obj, name); + internal_request_begin( + context, pollset, request, deadline, on_response, user_data, name, + grpc_httpcli_format_post_request(request, body_bytes, body_size)); gpr_free(name); - req->host = gpr_strdup(request->host); - - grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset); - grpc_resolve_address(request->host, req->handshaker->default_port, - on_resolved, req); } void grpc_httpcli_set_override(grpc_httpcli_get_override get, diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c3668f6a920..8b1a3b0f9e3 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -195,6 +195,7 @@ finish: void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), void *arg, grpc_pollset_set *interested_parties, + grpc_workqueue *workqueue, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; @@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - fdobj = grpc_fd_create(fd, name); + fdobj = grpc_fd_create(fd, workqueue, name); if (err >= 0) { cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 68f469c3681..c539cf2d348 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -261,7 +261,7 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1); + grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1); } /* TODO(ctiller): immediate return */ return GRPC_ENDPOINT_PENDING; diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index bcbd0afe6b9..02d37350f75 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -124,6 +124,9 @@ struct grpc_tcp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + + /** workqueue for interally created async work */ + grpc_workqueue *workqueue; }; grpc_tcp_server *grpc_tcp_server_create(void) { @@ -137,6 +140,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; + s->workqueue = grpc_workqueue_create(); return s; } @@ -147,6 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) { gpr_mu_destroy(&s->mu); gpr_free(s->ports); + grpc_workqueue_unref(s->workqueue); gpr_free(s); } @@ -339,7 +344,7 @@ static void on_read(void *arg, int success) { addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); - fdobj = grpc_fd_create(fd, name); + fdobj = grpc_fd_create(fd, sp->server->workqueue, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every incoming channel to every pollset owned by the server */ @@ -387,7 +392,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, s->workqueue, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; GPR_ASSERT(sp->emfd); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ed9eee8726b..d4e8e99bce1 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -121,6 +121,8 @@ struct grpc_udp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + + grpc_workqueue *workqueue; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -135,6 +137,7 @@ grpc_udp_server *grpc_udp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; + s->workqueue = grpc_workqueue_create(); return s; } @@ -146,6 +149,7 @@ static void finish_shutdown(grpc_udp_server *s) { gpr_cv_destroy(&s->cv); gpr_free(s->ports); + grpc_workqueue_unref(s->workqueue); gpr_free(s); } @@ -310,7 +314,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, s->workqueue, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->read_cb = read_cb; diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index ef1598c711e..26626bef3bc 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -35,6 +35,7 @@ #ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/workqueue.h" #include @@ -52,8 +53,9 @@ grpc_workqueue *grpc_workqueue_create(void) { workqueue->tail = &workqueue->head; grpc_wakeup_fd_init(&workqueue->wakeup_fd); sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = - grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); + workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */ + workqueue->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name); grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); return workqueue; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 4749f5f5165..a6f50712f54 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -85,7 +85,7 @@ static void state_unref(grpc_server_secure_state *state) { } static void setup_transport(void *statep, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, grpc_workqueue *workqueue) { static grpc_channel_filter const *extra_filters[] = { &grpc_server_auth_filter, &grpc_http_server_filter}; grpc_server_secure_state *state = statep; @@ -98,7 +98,8 @@ static void setup_transport(void *statep, grpc_transport *transport, grpc_server_get_channel_args(state->server), args_to_add, GPR_ARRAY_SIZE(args_to_add)); grpc_server_setup_transport(state->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy); + GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, + args_copy); grpc_channel_args_destroy(args_copy); } @@ -130,15 +131,17 @@ static void on_secure_transport_setup_done(void *statep, grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; + grpc_workqueue *workqueue; if (status == GRPC_SECURITY_OK) { gpr_mu_lock(&state->mu); remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); + workqueue = grpc_workqueue_create(); transport = grpc_create_chttp2_transport( grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, - 0); - setup_transport(state, transport, mdctx); + workqueue, 0); + setup_transport(state, transport, mdctx, workqueue); grpc_chttp2_transport_start_reading(transport, NULL, 0); } else { /* We need to consume this here, because the server may already have gone diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4168c2ef0cc..c2b30403191 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -499,7 +499,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { } else { c->destroy_closure.cb = destroy_call; c->destroy_closure.cb_arg = c; - grpc_iomgr_add_callback(&c->destroy_closure); + grpc_workqueue_push(grpc_channel_get_workqueue(c->channel), + &c->destroy_closure, 1); } } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a89523b3ab6..bf4aee190fd 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -79,6 +79,7 @@ struct grpc_channel { registered_call *registered_calls; grpc_iomgr_closure destroy_closure; char *target; + grpc_workqueue *workqueue; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -92,7 +93,8 @@ struct grpc_channel { grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t num_filters, - const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue, + int is_client) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); @@ -104,6 +106,7 @@ grpc_channel *grpc_channel_create_from_filters( /* decremented by grpc_channel_destroy */ gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; + channel->workqueue = workqueue; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0); channel->grpc_compression_algorithm_string = grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); @@ -311,7 +314,7 @@ void grpc_channel_internal_unref(grpc_channel *channel) { if (gpr_unref(&channel->refs)) { channel->destroy_closure.cb = destroy_channel; channel->destroy_closure.cb_arg = channel; - grpc_iomgr_add_callback(&channel->destroy_closure); + grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1); } } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 593faec7df8..9fc821d64b2 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -40,7 +40,8 @@ grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t count, - const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); + const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue, + int is_client); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 88a7c165985..12b15f353f4 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -176,7 +176,8 @@ void grpc_channel_watch_connectivity_state( "grpc_channel_watch_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name); - grpc_iomgr_add_delayed_callback(&w->on_complete, 1); + grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete, + 1); } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); grpc_client_channel_add_interested_party(client_channel_elem, diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 9e2cf1cf66e..7a4ec00abb7 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -74,7 +74,8 @@ static void connected(void *arg, grpc_endpoint *tcp) { grpc_iomgr_closure *notify; if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, tcp, c->args.metadata_context, 1); + c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue, + 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); GPR_ASSERT(c->result->transport); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); @@ -85,7 +86,7 @@ static void connected(void *arg, grpc_endpoint *tcp) { } notify = c->notify; c->notify = NULL; - grpc_iomgr_add_callback(notify); + notify->cb(notify->cb_arg, 1); } static void connector_connect(grpc_connector *con, @@ -98,8 +99,9 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, - args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, + args->workqueue, args->addr, args->addr_len, + args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -164,6 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_resolver *resolver; subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_workqueue *workqueue = grpc_workqueue_create(); size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { @@ -173,8 +176,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = - grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); + channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, + workqueue, 1); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -184,7 +187,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, f->merge_args = grpc_channel_args_copy(args); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base); + resolver = grpc_resolver_create(target, &f->base, workqueue); if (!resolver) { return NULL; } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 80704cbf677..a5de900effe 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -150,7 +150,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, channel_data *chand; static const grpc_channel_filter *filters[] = {&lame_filter}; channel = grpc_channel_create_from_filters(target, filters, 1, NULL, - grpc_mdctx_create(), 1); + grpc_mdctx_create(), + grpc_workqueue_create(), 1); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GPR_ASSERT(elem->filter == &lame_filter); chand = (channel_data *)elem->channel_data; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d1412604216..ec077af8dd2 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -57,7 +57,6 @@ typedef struct { gpr_refcount refs; grpc_channel_security_connector *security_connector; - grpc_workqueue *workqueue; grpc_iomgr_closure *notify; grpc_connect_in_args args; @@ -72,7 +71,6 @@ static void connector_ref(grpc_connector *con) { static void connector_unref(grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { - grpc_workqueue_unref(c->workqueue); gpr_free(c); } } @@ -88,7 +86,8 @@ static void on_secure_transport_setup_done(void *arg, memset(c->result, 0, sizeof(*c->result)); } else { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); + c->args.channel_args, secure_endpoint, c->args.metadata_context, + c->args.workqueue, 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_http_client_filter; @@ -124,8 +123,9 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue, - args->addr, args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, + args->workqueue, args->addr, args->addr_len, + args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -167,8 +167,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; - c->workqueue = grpc_channel_get_workqueue(f->master); - grpc_workqueue_ref(c->workqueue); gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; @@ -197,6 +195,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_channel_args *new_args_from_connector; grpc_channel_security_connector *connector; grpc_mdctx *mdctx; + grpc_workqueue *workqueue; grpc_resolver *resolver; subchannel_factory *f; #define MAX_FILTERS 3 @@ -219,6 +218,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, "Failed to create security connector."); } mdctx = grpc_mdctx_create(); + workqueue = grpc_workqueue_create(); connector_arg = grpc_security_connector_to_arg(&connector->base); args_copy = grpc_channel_args_copy_and_add( @@ -231,8 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = - grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); + channel = grpc_channel_create_from_filters(target, filters, n, args_copy, + mdctx, workqueue, 1); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -244,8 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, f->merge_args = grpc_channel_args_copy(args_copy); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base, - grpc_channel_get_workqueue(channel)); + resolver = grpc_resolver_create(target, &f->base, workqueue); if (!resolver) { return NULL; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3d404f78a4f..aba0f94fd48 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -219,6 +219,8 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; + + grpc_workqueue *workqueue; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -314,7 +316,7 @@ static void kill_zombie(void *elem, int success) { } static void request_matcher_zombify_all_pending_calls( - request_matcher *request_matcher) { + request_matcher *request_matcher, grpc_workqueue *workqueue) { while (request_matcher->pending_head) { call_data *calld = request_matcher->pending_head; request_matcher->pending_head = calld->pending_next; @@ -324,7 +326,7 @@ static void request_matcher_zombify_all_pending_calls( grpc_iomgr_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1); } } @@ -363,6 +365,7 @@ static void server_delete(grpc_server *server) { } request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); + grpc_workqueue_unref(server->workqueue); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -401,7 +404,8 @@ static void destroy_channel(channel_data *chand) { maybe_finish_shutdown(chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); + grpc_workqueue_push(chand->server->workqueue, + &chand->finish_destroy_channel_closure, 1); } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, @@ -414,7 +418,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); return; } @@ -505,10 +509,11 @@ static void kill_pending_work_locked(grpc_server *server) { registered_method *rm; request_matcher_kill_requests(server, &server->unregistered_request_matcher); request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher); + &server->unregistered_request_matcher, server->workqueue); for (rm = server->registered_methods; rm; rm = rm->next) { request_matcher_kill_requests(server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(&rm->request_matcher); + request_matcher_zombify_all_pending_calls(&rm->request_matcher, + server->workqueue); } } @@ -561,6 +566,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; gpr_timespec op_deadline; if (success && !calld->got_initial_metadata) { @@ -595,7 +601,8 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(chand->server->workqueue, + &calld->kill_zombie_closure, 1); } else { gpr_mu_unlock(&calld->mu_state); } @@ -606,7 +613,8 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(chand->server->workqueue, + &calld->kill_zombie_closure, 1); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -799,6 +807,7 @@ grpc_server *grpc_server_create_from_filters( gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + server->workqueue = grpc_workqueue_create(); /* TODO(ctiller): expose a channel_arg for this */ server->max_requested_calls = 32768; @@ -873,6 +882,7 @@ void grpc_server_start(grpc_server *server) { server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]); } for (l = server->listeners; l; l = l->next) { @@ -883,6 +893,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, + grpc_workqueue *workqueue, const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = @@ -917,7 +928,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, } channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, - mdctx, 0); + mdctx, workqueue, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -1119,7 +1130,7 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_iomgr_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index c638d682bb1..1d82d07cedc 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -58,6 +58,7 @@ void grpc_server_listener_destroy_done(void *server); void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, + grpc_workqueue *workqueue, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 4ab845bc008..91cf6ece9cd 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -43,11 +43,11 @@ #include static void setup_transport(void *server, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, grpc_workqueue *workqueue) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, + GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, grpc_server_get_channel_args(server)); } @@ -60,9 +60,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * case. */ grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_workqueue *workqueue = grpc_workqueue_create(); grpc_transport *transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(server), tcp, mdctx, 0); - setup_transport(server, transport, mdctx); + grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0); + setup_transport(server, transport, mdctx, workqueue); grpc_chttp2_transport_start_reading(transport, NULL, 0); } diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c index 05451c7a8ad..10d1e0a523c 100644 --- a/src/core/transport/chttp2/frame_ping.c +++ b/src/core/transport/chttp2/frame_ping.c @@ -89,7 +89,9 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( for (ping = transport_parsing->pings.next; ping != &transport_parsing->pings; ping = ping->next) { if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { - grpc_iomgr_add_delayed_callback(ping->on_recv, 1); + /* we know no locks are held here, we may as well just call up + * directly */ + ping->on_recv->cb(ping->on_recv->cb_arg, 1); } ping->next->prev = ping->prev; ping->prev->next = ping->next; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index deb2fedf0c4..705a025ccaa 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -166,7 +166,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_iomgr_add_delayed_callback(ping->on_recv, 0); + ping->on_recv->cb(ping->on_recv->cb_arg, 0); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -209,7 +209,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - gpr_uint8 is_client) { + grpc_workqueue *workqueue, gpr_uint8 is_client) { size_t i; int j; @@ -242,7 +242,7 @@ static void init_transport(grpc_chttp2_transport *t, t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; - grpc_connectivity_state_init(&t->channel_callback.state_tracker, + grpc_connectivity_state_init(&t->channel_callback.state_tracker, workqueue, GRPC_CHANNEL_READY, "transport"); gpr_slice_buffer_init(&t->global.qbuf); @@ -1280,9 +1280,9 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - int is_client) { + grpc_workqueue *workqueue, int is_client) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, mdctx, is_client != 0); + init_transport(t, channel_args, ep, mdctx, workqueue, is_client != 0); return &t->base; } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index fa0d6e4151e..8bd8af6236c 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/workqueue.h" #include "src/core/transport/transport.h" extern int grpc_http_trace; @@ -42,7 +43,7 @@ extern int grpc_flowctl_trace; grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, - grpc_mdctx *metadata_context, int is_client); + grpc_mdctx *metadata_context, grpc_workqueue *workqueue, int is_client); void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices); diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 61d26f06f09..716280505e8 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -56,6 +56,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { } void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, + grpc_workqueue *workqueue, grpc_connectivity_state init_state, const char *name) { tracker->current_state = init_state; @@ -64,16 +65,18 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { + int success; grpc_connectivity_state_watcher *w; while ((w = tracker->watchers)) { tracker->watchers = w->next; if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { *w->current = GRPC_CHANNEL_FATAL_FAILURE; - grpc_iomgr_add_callback(w->notify); + success = 1; } else { - grpc_iomgr_add_delayed_callback(w->notify, 0); + success = 0; } + grpc_workqueue_push(tracker->workqueue, w->notify, success); gpr_free(w); } gpr_free(tracker->name); @@ -94,7 +97,7 @@ int grpc_connectivity_state_notify_on_state_change( } if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_iomgr_add_callback(notify); + grpc_workqueue_push(tracker->workqueue, notify, 1); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -136,13 +139,13 @@ void grpc_connectivity_state_set_with_scheduler( tracker->watchers = new; } -static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) { - grpc_iomgr_add_callback(closure); +static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) { + grpc_workqueue_push(workqueue, closure, 1); } void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, const char *reason) { grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler, - NULL, reason); + tracker->workqueue, reason); } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index a3b0b80c986..6c61e026234 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -36,6 +36,7 @@ #include #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/workqueue.h" typedef struct grpc_connectivity_state_watcher { /** we keep watchers in a linked list */ @@ -53,11 +54,14 @@ typedef struct { grpc_connectivity_state_watcher *watchers; /** a name to help debugging */ char *name; + /** workqueue for async work */ + grpc_workqueue *workqueue; } grpc_connectivity_state_tracker; extern int grpc_connectivity_state_trace; void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, + grpc_workqueue *grpc_workqueue, grpc_connectivity_state init_state, const char *name); void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 41ac83b7b70..b1d3479fa5b 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -65,12 +65,13 @@ static void done_write(void *arg, int success) { } static void server_setup_transport(void *ts, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, + grpc_workqueue *workqueue) { thd_args *a = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(a->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, + GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, grpc_server_get_channel_args(a->server)); }