pull/3423/head
Craig Tiller 9 years ago
parent 000cd8f9f7
commit 33825118df
  1. 4
      src/core/census/grpc_filter.c
  2. 79
      src/core/channel/client_channel.c
  3. 5
      src/core/channel/client_channel.h
  4. 6
      src/core/channel/http_client_filter.c
  5. 6
      src/core/channel/http_server_filter.c
  6. 2
      src/core/client_config/connector.c
  7. 4
      src/core/client_config/connector.h
  8. 33
      src/core/client_config/lb_policies/pick_first.c
  9. 17
      src/core/client_config/lb_policy.c
  10. 35
      src/core/client_config/lb_policy.h
  11. 2
      src/core/client_config/resolver.c
  12. 4
      src/core/client_config/resolver.h
  13. 18
      src/core/client_config/resolvers/dns_resolver.c
  14. 19
      src/core/client_config/resolvers/sockaddr_resolver.c
  15. 21
      src/core/client_config/resolvers/zookeeper_resolver.c
  16. 49
      src/core/client_config/subchannel.c
  17. 8
      src/core/client_config/subchannel.h
  18. 8
      src/core/httpcli/httpcli.c
  19. 4
      src/core/iomgr/endpoint.c
  20. 8
      src/core/iomgr/endpoint.h
  21. 21
      src/core/iomgr/fd_posix.c
  22. 11
      src/core/iomgr/fd_posix.h
  23. 19
      src/core/iomgr/iomgr.c
  24. 31
      src/core/iomgr/iomgr.h
  25. 4
      src/core/iomgr/pollset_multipoller_with_epoll.c
  26. 16
      src/core/iomgr/pollset_posix.c
  27. 10
      src/core/iomgr/pollset_posix.h
  28. 2
      src/core/iomgr/socket_windows.h
  29. 2
      src/core/iomgr/tcp_client_posix.c
  30. 16
      src/core/iomgr/tcp_posix.c
  31. 4
      src/core/iomgr/tcp_server_posix.c
  32. 12
      src/core/iomgr/tcp_windows.c
  33. 4
      src/core/iomgr/udp_server.c
  34. 4
      src/core/iomgr/workqueue.h
  35. 97
      src/core/iomgr/workqueue_posix.c
  36. 5
      src/core/iomgr/workqueue_posix.h
  37. 12
      src/core/security/secure_endpoint.c
  38. 12
      src/core/security/secure_transport_setup.c
  39. 6
      src/core/security/server_auth_filter.c
  40. 19
      src/core/surface/call.c
  41. 14
      src/core/surface/channel_connectivity.c
  42. 6
      src/core/surface/channel_create.c
  43. 8
      src/core/surface/secure_channel_create.c
  44. 32
      src/core/surface/server.c
  45. 14
      src/core/transport/chttp2/internal.h
  46. 4
      src/core/transport/chttp2/writing.c
  47. 36
      src/core/transport/chttp2_transport.c
  48. 8
      src/core/transport/connectivity_state.c
  49. 7
      src/core/transport/connectivity_state.h
  50. 6
      src/core/transport/transport.c
  51. 14
      src/core/transport/transport.h
  52. 4
      test/core/bad_client/bad_client.c
  53. 2
      test/core/end2end/fixtures/h2_sockpair+trace.c
  54. 2
      test/core/end2end/fixtures/h2_sockpair.c
  55. 2
      test/core/end2end/fixtures/h2_sockpair_1byte.c
  56. 11
      test/core/iomgr/endpoint_tests.c
  57. 10
      test/core/iomgr/fd_posix_test.c
  58. 10
      test/core/iomgr/tcp_posix_test.c
  59. 4
      test/core/iomgr/workqueue_test.c

@ -54,7 +54,7 @@ typedef struct call_data {
/* recv callback */
grpc_stream_op_buffer* recv_ops;
grpc_iomgr_closure* on_done_recv;
grpc_closure* on_done_recv;
} call_data;
typedef struct channel_data {
@ -145,7 +145,7 @@ static void server_init_call_elem(grpc_call_element* elem,
GPR_ASSERT(d != NULL);
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
grpc_iomgr_closure_init(d->on_done_recv, server_on_done_recv, elem);
grpc_closure_init(d->on_done_recv, server_on_done_recv, elem);
if (initial_op) server_mutate_op(elem, initial_op);
}

@ -73,9 +73,9 @@ typedef struct {
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
grpc_iomgr_call_list waiting_for_config_closures;
grpc_call_list waiting_for_config_closures;
/** resolver callback */
grpc_iomgr_closure on_config_changed;
grpc_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
@ -91,7 +91,7 @@ typedef struct {
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
grpc_iomgr_closure on_changed;
grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
@ -115,7 +115,7 @@ struct call_data {
call_state state;
gpr_timespec deadline;
grpc_subchannel *picked_channel;
grpc_iomgr_closure async_setup_task;
grpc_closure async_setup_task;
grpc_transport_stream_op waiting_op;
/* our child call stack */
grpc_subchannel_call *subchannel_call;
@ -123,9 +123,9 @@ struct call_data {
grpc_linked_mdelem details;
};
static grpc_iomgr_closure *merge_into_waiting_op(
grpc_call_element *elem,
grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op)
GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_stream_op *op) {
@ -160,7 +160,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
typedef struct {
grpc_iomgr_closure closure;
grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
@ -179,10 +179,9 @@ static void add_to_lb_policy_wait_queue_locked_state_config(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
waiting_call *wc = gpr_malloc(sizeof(*wc));
grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
grpc_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
1);
grpc_call_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
}
static int is_empty(void *p, int len) {
@ -230,7 +229,7 @@ static void started_call(void *arg, int iomgr_success) {
static void picked_target(void *arg, int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@ -246,19 +245,19 @@ static void picked_target(void *arg, int iomgr_success) {
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
grpc_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(calld->picked_channel, pollset,
&calld->subchannel_call,
&calld->async_setup_task, &call_list);
}
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
static grpc_iomgr_closure *merge_into_waiting_op(
grpc_call_element *elem, grpc_transport_stream_op *new_op) {
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
grpc_iomgr_closure *consumed_op = NULL;
grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
@ -312,7 +311,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@ -330,7 +329,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
@ -359,8 +358,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
handle_op_after_cancellation(elem, &op2);
} else {
grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
1);
grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
@ -397,8 +395,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
calld);
grpc_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel,
&calld->async_setup_task, &call_list);
@ -427,7 +424,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
@ -437,10 +434,10 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state,
grpc_iomgr_call_list *cl);
grpc_call_list *cl);
static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
grpc_iomgr_call_list *cl) {
grpc_call_list *cl) {
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
@ -453,13 +450,13 @@ static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list cl = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&w->chand->mu_config);
on_lb_policy_state_changed_locked(w, &cl);
gpr_mu_unlock(&w->chand->mu_config);
grpc_iomgr_call_list_run(cl);
grpc_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
@ -467,12 +464,12 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
w->chand = chand;
grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
@ -485,7 +482,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list cl = GRPC_CALL_LIST_INIT;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
@ -505,7 +502,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
grpc_call_list_move(&chand->waiting_for_config_closures, &cl);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@ -553,7 +550,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
}
grpc_iomgr_call_list_run(cl);
grpc_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@ -562,10 +559,10 @@ static void cc_start_transport_op(grpc_channel_element *elem,
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (op->on_consumed) {
grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
grpc_call_list_add(&call_list, op->on_consumed, 1);
op->on_consumed = NULL;
}
@ -612,7 +609,7 @@ static void cc_start_transport_op(grpc_channel_element *elem,
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
/* Constructor for call_data */
@ -677,8 +674,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->mdctx = metadata_context;
chand->master = master;
grpc_pollset_set_init(&chand->pollset_set);
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
grpc_connectivity_state_init(&chand->state_tracker,
GRPC_CHANNEL_IDLE, "client_channel");
@ -722,7 +718,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
if (!grpc_call_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@ -733,8 +729,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element *elem, int try_to_connect,
grpc_iomgr_call_list *call_list) {
grpc_channel_element *elem, int try_to_connect, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
@ -758,7 +753,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
grpc_closure *on_complete, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,

@ -53,12 +53,11 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element *elem, int try_to_connect,
grpc_iomgr_call_list *call_list);
grpc_channel_element *elem, int try_to_connect, grpc_call_list *call_list);
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list);
grpc_closure *on_complete, grpc_call_list *call_list);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem);

@ -50,11 +50,11 @@ typedef struct call_data {
grpc_stream_op_buffer *recv_ops;
/** Closure to call when finished with the hc_on_recv hook */
grpc_iomgr_closure *on_done_recv;
grpc_closure *on_done_recv;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
grpc_iomgr_closure hc_on_recv;
grpc_closure hc_on_recv;
} call_data;
typedef struct channel_data {
@ -162,7 +162,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
calld->on_done_recv = NULL;
grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
if (initial_op) hc_mutate_op(elem, initial_op);
}

@ -50,11 +50,11 @@ typedef struct call_data {
grpc_stream_op_buffer *recv_ops;
/** Closure to call when finished with the hs_on_recv hook */
grpc_iomgr_closure *on_done_recv;
grpc_closure *on_done_recv;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
grpc_iomgr_closure hs_on_recv;
grpc_closure hs_on_recv;
} call_data;
typedef struct channel_data {
@ -232,7 +232,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_iomgr_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
if (initial_op) hs_mutate_op(elem, initial_op);
}

@ -44,7 +44,7 @@ void grpc_connector_unref(grpc_connector *connector) {
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify) {
grpc_closure *notify) {
connector->vtable->connect(connector, in_args, out_args, notify);
}

@ -77,7 +77,7 @@ struct grpc_connector_vtable {
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args, grpc_iomgr_closure *notify);
grpc_connect_out_args *out_args, grpc_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
@ -86,7 +86,7 @@ void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify);
grpc_closure *notify);
/** Cancel any pending connection */
void grpc_connector_shutdown(grpc_connector *connector);

@ -43,7 +43,7 @@ typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
grpc_subchannel **target;
grpc_iomgr_closure *on_complete;
grpc_closure *on_complete;
} pending_pick;
typedef struct {
@ -55,7 +55,7 @@ typedef struct {
/** workqueue for async work */
grpc_workqueue *workqueue;
grpc_iomgr_closure connectivity_changed;
grpc_closure connectivity_changed;
/** mutex protecting remaining members */
gpr_mu mu;
@ -108,7 +108,7 @@ void pf_destroy(grpc_lb_policy *pol) {
gpr_free(p);
}
void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -122,14 +122,13 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
grpc_iomgr_call_list_add(call_list, pp->on_complete, 1);
grpc_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
pp = next;
}
}
static void start_picking(pick_first_lb_policy *p,
grpc_iomgr_call_list *call_list) {
static void start_picking(pick_first_lb_policy *p, grpc_call_list *call_list) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
@ -139,7 +138,7 @@ static void start_picking(pick_first_lb_policy *p,
&p->connectivity_changed, call_list);
}
void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
void pf_exit_idle(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
@ -150,7 +149,7 @@ void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
grpc_closure *on_complete, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -178,7 +177,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&p->mu);
@ -205,7 +204,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
grpc_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@ -251,7 +250,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
grpc_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
@ -270,7 +269,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
gpr_mu_unlock(&p->mu);
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
@ -278,7 +277,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
@ -301,7 +300,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
}
static grpc_connectivity_state pf_check_connectivity(
grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
@ -312,8 +311,8 @@ static grpc_connectivity_state pf_check_connectivity(
void pf_notify_on_state_change(grpc_lb_policy *pol,
grpc_connectivity_state *current,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list) {
grpc_closure *notify,
grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
@ -348,7 +347,7 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
"pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
gpr_mu_init(&p->mu);
return &p->base;
}

@ -64,37 +64,36 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) {
}
void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
policy->vtable->shutdown(policy, call_list);
}
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **target,
grpc_iomgr_closure *on_complete,
grpc_iomgr_call_list *call_list) {
grpc_subchannel **target, grpc_closure *on_complete,
grpc_call_list *call_list) {
policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete,
call_list);
}
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
policy->vtable->broadcast(policy, op, call_list);
}
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
policy->vtable->exit_idle(policy, call_list);
}
void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure,
grpc_iomgr_call_list *call_list) {
grpc_closure *closure,
grpc_call_list *call_list) {
policy->vtable->notify_on_state_change(policy, state, closure, call_list);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) {
grpc_lb_policy *policy, grpc_call_list *call_list) {
return policy->vtable->check_connectivity(policy, call_list);
}

@ -53,31 +53,30 @@ struct grpc_lb_policy {
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy *policy);
void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
void (*shutdown)(grpc_lb_policy *policy, grpc_call_list *call_list);
/** implement grpc_lb_policy_pick */
void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete,
grpc_iomgr_call_list *call_list);
grpc_closure *on_complete, grpc_call_list *call_list);
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
void (*exit_idle)(grpc_lb_policy *policy, grpc_call_list *call_list);
/** broadcast a transport op to all subchannels */
void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op,
grpc_iomgr_call_list *call_list);
grpc_call_list *call_list);
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)(
grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy,
grpc_call_list *call_list);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
void (*notify_on_state_change)(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure,
grpc_iomgr_call_list *call_list);
grpc_closure *closure,
grpc_call_list *call_list);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@ -101,8 +100,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list);
void grpc_lb_policy_shutdown(grpc_lb_policy *policy, grpc_call_list *call_list);
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
@ -110,22 +108,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
Picking can be asynchronous. Any IO should be done under \a pollset. */
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **target,
grpc_iomgr_closure *on_complete,
grpc_iomgr_call_list *call_list);
grpc_subchannel **target, grpc_closure *on_complete,
grpc_call_list *call_list);
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
grpc_iomgr_call_list *call_list);
grpc_call_list *call_list);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list);
grpc_call_list *call_list);
void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure,
grpc_iomgr_call_list *call_list);
grpc_closure *closure,
grpc_call_list *call_list);
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
grpc_lb_policy *policy, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */

@ -78,6 +78,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
void grpc_resolver_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
grpc_closure *on_complete) {
resolver->vtable->next(resolver, target_config, on_complete);
}

@ -55,7 +55,7 @@ struct grpc_resolver_vtable {
struct sockaddr *failing_address,
int failing_address_len);
void (*next)(grpc_resolver *resolver, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
grpc_closure *on_complete);
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
@ -92,6 +92,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
schedule on_complete. */
void grpc_resolver_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
grpc_closure *on_complete);
#endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */

@ -69,7 +69,7 @@ typedef struct {
/** which version of resolved_config is current? */
int resolved_version;
/** pending next completion, or NULL */
grpc_iomgr_closure *next_completion;
grpc_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
/** current (fully resolved) config */
@ -79,7 +79,7 @@ typedef struct {
static void dns_destroy(grpc_resolver *r);
static void dns_start_resolving_locked(dns_resolver *r);
static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r)
static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r)
GRPC_MUST_USE_RESULT;
static void dns_shutdown(grpc_resolver *r);
@ -87,14 +87,14 @@ static void dns_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void dns_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
grpc_closure *on_complete);
static const grpc_resolver_vtable dns_resolver_vtable = {
dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
static void dns_shutdown(grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
grpc_iomgr_closure *next_completion;
grpc_closure *next_completion;
gpr_mu_lock(&r->mu);
next_completion = r->next_completion;
r->next_completion = NULL;
@ -117,9 +117,9 @@ static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
static void dns_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
grpc_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
grpc_iomgr_closure *call = NULL;
grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@ -141,7 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
grpc_iomgr_closure *call;
grpc_closure *call;
size_t i;
if (addresses) {
grpc_lb_policy_args lb_policy_args;
@ -188,8 +188,8 @@ static void dns_start_resolving_locked(dns_resolver *r) {
grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
}
static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
grpc_iomgr_closure *ret = NULL;
static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
grpc_closure *ret = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;

@ -73,22 +73,22 @@ typedef struct {
/** have we published? */
int published;
/** pending next completion, or NULL */
grpc_iomgr_closure *next_completion;
grpc_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
} sockaddr_resolver;
static void sockaddr_destroy(grpc_resolver *r);
static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
sockaddr_resolver *r) GRPC_MUST_USE_RESULT;
static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r)
GRPC_MUST_USE_RESULT;
static void sockaddr_shutdown(grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
grpc_closure *on_complete);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
@ -96,7 +96,7 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = {
static void sockaddr_shutdown(grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
grpc_iomgr_closure *call = NULL;
grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
@ -114,9 +114,9 @@ static void sockaddr_channel_saw_error(grpc_resolver *resolver,
static void sockaddr_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
grpc_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
grpc_iomgr_closure *call = NULL;
grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@ -126,14 +126,13 @@ static void sockaddr_next(grpc_resolver *resolver,
if (call) call->cb(call->cb_arg, 1);
}
static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
sockaddr_resolver *r) {
static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_iomgr_closure *call = NULL;
grpc_closure *call = NULL;
if (r->next_completion != NULL && !r->published) {
size_t i;

@ -73,7 +73,7 @@ typedef struct {
/** which version of resolved_config is current? */
int resolved_version;
/** pending next completion, or NULL */
grpc_iomgr_closure *next_completion;
grpc_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
/** current (fully resolved) config */
@ -92,15 +92,15 @@ typedef struct {
static void zookeeper_destroy(grpc_resolver *r);
static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
zookeeper_resolver *r) GRPC_MUST_USE_RESULT;
static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r)
GRPC_MUST_USE_RESULT;
static void zookeeper_shutdown(grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
grpc_closure *on_complete);
static const grpc_resolver_vtable zookeeper_resolver_vtable = {
zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
@ -108,7 +108,7 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = {
static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
grpc_iomgr_closure *call = NULL;
grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
@ -134,9 +134,9 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver,
static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
grpc_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
grpc_iomgr_closure *call;
grpc_closure *call;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
@ -189,7 +189,7 @@ static void zookeeper_on_resolved(void *arg,
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
grpc_iomgr_closure *call;
grpc_closure *call;
size_t i;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
@ -414,9 +414,8 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
zookeeper_resolve_address(r);
}
static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
zookeeper_resolver *r) {
grpc_iomgr_closure *call = NULL;
static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
grpc_closure *call = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;

@ -59,7 +59,7 @@ typedef struct {
} connection;
typedef struct {
grpc_iomgr_closure closure;
grpc_closure closure;
size_t version;
grpc_subchannel *subchannel;
grpc_connectivity_state connectivity_state;
@ -67,11 +67,11 @@ typedef struct {
typedef struct waiting_for_connect {
struct waiting_for_connect *next;
grpc_iomgr_closure *notify;
grpc_closure *notify;
grpc_pollset *pollset;
grpc_subchannel_call **target;
grpc_subchannel *subchannel;
grpc_iomgr_closure continuation;
grpc_closure continuation;
} waiting_for_connect;
struct grpc_subchannel {
@ -100,7 +100,7 @@ struct grpc_subchannel {
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
grpc_iomgr_closure connected;
grpc_closure connected;
/** pollset_set tracking who's interested in a connection
being setup - owned by the master channel (in particular the
@ -147,7 +147,7 @@ struct grpc_subchannel_call {
static grpc_subchannel_call *create_call(connection *con);
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
grpc_iomgr_call_list *call_list);
grpc_call_list *call_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
@ -303,7 +303,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
gpr_mu_init(&c->mu);
@ -334,7 +334,7 @@ static void start_connect(grpc_subchannel *c) {
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
w4c->notify, &call_list);
@ -344,8 +344,8 @@ static void continue_creating_call(void *arg, int iomgr_success) {
void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list) {
grpc_closure *notify,
grpc_call_list *call_list) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@ -364,7 +364,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
w4c->subchannel = c;
/* released when clearing w4c */
SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
@ -392,8 +392,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list) {
grpc_closure *notify,
grpc_call_list *call_list) {
int do_connect = 0;
gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
@ -417,7 +417,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
@ -454,7 +454,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_connector_shutdown(c->connector);
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
static void on_state_changed(void *p, int iomgr_success) {
@ -465,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(mu);
@ -514,11 +514,10 @@ done:
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
static void publish_transport(grpc_subchannel *c,
grpc_iomgr_call_list *call_list) {
static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@ -552,7 +551,7 @@ static void publish_transport(grpc_subchannel *c,
/* initialize state watcher */
sw = gpr_malloc(sizeof(*sw));
grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
grpc_closure_init(&sw->closure, on_state_changed, sw);
sw->subchannel = c;
sw->connectivity_state = GRPC_CHANNEL_READY;
@ -599,7 +598,7 @@ static void publish_transport(grpc_subchannel *c,
while (w4c != NULL) {
waiting_for_connect *next = w4c;
grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1);
grpc_call_list_add(call_list, &w4c->continuation, 1);
w4c = next;
}
@ -641,7 +640,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
@ -656,12 +655,12 @@ static void on_alarm(void *arg, int iomgr_success) {
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(c, "connecting");
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (c->connecting_result.transport != NULL) {
publish_transport(c, &call_list);
} else {
@ -673,7 +672,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock(&c->mu);
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@ -705,7 +704,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
grpc_connectivity_state current = compute_connectivity_locked(c);
grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
}

@ -76,8 +76,8 @@ void grpc_subchannel_call_unref(
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list);
grpc_closure *notify,
grpc_call_list *call_list);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
@ -91,8 +91,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
Updates *state with the new state of the channel */
void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list);
grpc_closure *notify,
grpc_call_list *call_list);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,

@ -63,8 +63,8 @@ typedef struct {
grpc_iomgr_object iomgr_obj;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
grpc_iomgr_closure on_read;
grpc_iomgr_closure done_write;
grpc_closure on_read;
grpc_closure done_write;
grpc_workqueue *workqueue;
} internal_request;
@ -237,8 +237,8 @@ static void internal_request_begin(grpc_httpcli_context *context,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
grpc_iomgr_closure_init(&req->on_read, on_read, req);
grpc_iomgr_closure_init(&req->done_write, done_write, req);
grpc_closure_init(&req->on_read, on_read, req);
grpc_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing);
grpc_iomgr_register_object(&req->iomgr_obj, name);

@ -35,13 +35,13 @@
grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
return ep->vtable->read(ep, slices, cb);
}
grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
return ep->vtable->write(ep, slices, cb);
}

@ -54,9 +54,9 @@ typedef enum grpc_endpoint_op_status {
struct grpc_endpoint_vtable {
grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
grpc_iomgr_closure *cb);
grpc_closure *cb);
grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
grpc_iomgr_closure *cb);
grpc_closure *cb);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
@ -70,7 +70,7 @@ struct grpc_endpoint_vtable {
Valid slices may be placed into \a slices even on callback success == 0. */
grpc_endpoint_op_status grpc_endpoint_read(
grpc_endpoint *ep, gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
grpc_closure *cb) GRPC_MUST_USE_RESULT;
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
@ -86,7 +86,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
*/
grpc_endpoint_op_status grpc_endpoint_write(
grpc_endpoint *ep, gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
grpc_closure *cb) GRPC_MUST_USE_RESULT;
/* Causes any pending read/write callbacks to run immediately with
success==0 */

@ -218,8 +218,7 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) {
void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
gpr_mu_lock(&fd->watcher_mu);
@ -253,7 +252,7 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void process_callback(grpc_iomgr_closure *closure, int success,
static void process_callback(grpc_closure *closure, int success,
grpc_workqueue *optional_workqueue) {
if (optional_workqueue == NULL) {
closure->cb(closure->cb_arg, success);
@ -262,15 +261,15 @@ static void process_callback(grpc_iomgr_closure *closure, int success,
}
}
static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
int success, grpc_workqueue *optional_workqueue) {
static void process_callbacks(grpc_closure *callbacks, size_t n, int success,
grpc_workqueue *optional_workqueue) {
size_t i;
for (i = 0; i < n; i++) {
process_callback(callbacks + i, success, optional_workqueue);
}
}
static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure,
int allow_synchronous_callback) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
@ -307,7 +306,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}
static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);
@ -327,7 +326,7 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
default: /* waiting */
GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
callbacks[(*ncallbacks)++] = (grpc_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
@ -338,7 +337,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
grpc_iomgr_closure *closure;
grpc_closure *closure;
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
@ -365,11 +364,11 @@ void grpc_fd_shutdown(grpc_fd *fd) {
0 /* GPR_FALSE */);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) {
notify_on(fd, &fd->readst, closure, 0);
}
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) {
notify_on(fd, &fd->writest, closure, 0);
}

@ -96,8 +96,8 @@ struct grpc_fd {
struct grpc_fd *freelist_next;
grpc_iomgr_closure *on_done_closure;
grpc_iomgr_closure *shutdown_closures[2];
grpc_closure *on_done_closure;
grpc_closure *shutdown_closures[2];
grpc_iomgr_object iomgr_object;
};
@ -113,8 +113,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason);
void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
@ -153,10 +152,10 @@ void grpc_fd_shutdown(grpc_fd *fd);
underlying platform. This means that users must drain fd in read_cb before
calling notify_on_read again. Users are also expected to handle spurious
events, i.e read_cb is called while nothing can be readable from fd */
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure);
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure);
/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure);
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure);
/* Notification from the poller to an fd that it has become readable or
writable.

@ -151,15 +151,15 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_free(obj->name);
}
void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->next = NULL;
}
void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
grpc_iomgr_closure *closure, int success) {
void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure,
int success) {
if (!closure) return;
closure->next = NULL;
closure->success = success;
@ -171,21 +171,20 @@ void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
call_list->tail = closure;
}
void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) {
grpc_iomgr_closure *c = call_list.head;
void grpc_call_list_run(grpc_call_list call_list) {
grpc_closure *c = call_list.head;
while (c) {
grpc_iomgr_closure *next = c->next;
grpc_closure *next = c->next;
c->cb(c->cb_arg, c->success);
c = next;
}
}
int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) {
int grpc_call_list_empty(grpc_call_list call_list) {
return call_list.head == NULL;
}
void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
grpc_iomgr_call_list *dst) {
void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) {
if (dst->head == NULL) {
*dst = *src;
return;

@ -42,7 +42,7 @@
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
/** A closure over a grpc_iomgr_cb_func. */
typedef struct grpc_iomgr_closure {
typedef struct grpc_closure {
/** Bound callback. */
grpc_iomgr_cb_func cb;
@ -55,27 +55,26 @@ typedef struct grpc_iomgr_closure {
int success;
/**< Internal. Do not touch */
struct grpc_iomgr_closure *next;
} grpc_iomgr_closure;
struct grpc_closure *next;
} grpc_closure;
typedef struct grpc_iomgr_call_list {
grpc_iomgr_closure *head;
grpc_iomgr_closure *tail;
} grpc_iomgr_call_list;
typedef struct grpc_call_list {
grpc_closure *head;
grpc_closure *tail;
} grpc_call_list;
/** Initializes \a closure with \a cb and \a cb_arg. */
void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);
#define GRPC_IOMGR_CALL_LIST_INIT \
#define GRPC_CALL_LIST_INIT \
{ NULL, NULL }
void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list,
grpc_iomgr_closure *closure, int success);
void grpc_iomgr_call_list_run(grpc_iomgr_call_list list);
void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
grpc_iomgr_call_list *dst);
int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list);
void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure,
int success);
void grpc_call_list_run(grpc_call_list list);
void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst);
int grpc_call_list_empty(grpc_call_list list);
/** Initializes the iomgr. */
void grpc_iomgr_init(void);

@ -53,7 +53,7 @@ typedef struct wakeup_fd_hdl {
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
grpc_iomgr_closure closure;
grpc_closure closure;
} delayed_add;
typedef struct {
@ -125,7 +125,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
da->pollset = pollset;
da->fd = fd;
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_pollset_add_unlock_job(pollset, &da->closure);
}

@ -175,30 +175,28 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) {
grpc_iomgr_closure *exec = *root;
static void run_jobs(grpc_pollset *pollset, grpc_closure **root) {
grpc_closure *exec = *root;
*root = NULL;
gpr_mu_unlock(&pollset->mu);
while (exec != NULL) {
grpc_iomgr_closure *next = exec->next;
grpc_closure *next = exec->next;
exec->cb(exec->cb_arg, 1);
exec = next;
}
gpr_mu_lock(&pollset->mu);
}
static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) {
static void add_job(grpc_closure **root, grpc_closure *closure) {
closure->next = *root;
*root = closure;
}
void grpc_pollset_add_idle_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure) {
void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) {
add_job(&pollset->idle_jobs, closure);
}
void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure) {
void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) {
add_job(&pollset->unlock_jobs, closure);
}
@ -316,7 +314,7 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
grpc_iomgr_closure promotion_closure;
grpc_closure promotion_closure;
} grpc_unary_promote_args;
static void basic_do_promote(void *args, int success) {

@ -67,8 +67,8 @@ typedef struct grpc_pollset {
int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
grpc_iomgr_closure *unlock_jobs;
grpc_iomgr_closure *idle_jobs;
grpc_closure *unlock_jobs;
grpc_closure *idle_jobs;
union {
int fd;
void *ptr;
@ -128,10 +128,8 @@ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
/** schedule a closure to be run next time there are no active workers */
void grpc_pollset_add_idle_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure);
void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure);
/** schedule a closure to be run next time the pollset is unlocked */
void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure);
void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

@ -91,7 +91,7 @@ typedef struct grpc_winsocket {
This prevents that. */
int added_to_iocp;
grpc_iomgr_closure shutdown_closure;
grpc_closure shutdown_closure;
/* A label for iomgr to track outstanding objects */
grpc_iomgr_object iomgr_object;

@ -64,7 +64,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
grpc_iomgr_closure write_closure;
grpc_closure write_closure;
grpc_pollset_set *interested_parties;
char *addr_str;
} async_connect;

@ -85,11 +85,11 @@ typedef struct {
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
size_t outgoing_byte_idx;
grpc_iomgr_closure *read_cb;
grpc_iomgr_closure *write_cb;
grpc_closure *read_cb;
grpc_closure *write_cb;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
grpc_closure read_closure;
grpc_closure write_closure;
char *peer_string;
} grpc_tcp;
@ -145,7 +145,7 @@ static void tcp_destroy(grpc_endpoint *ep) {
}
static void call_read_cb(grpc_tcp *tcp, int success) {
grpc_iomgr_closure *cb = tcp->read_cb;
grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
@ -250,7 +250,7 @@ static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
gpr_slice_buffer *incoming_buffer,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
@ -350,7 +350,7 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_op_status status;
grpc_iomgr_closure *cb;
grpc_closure *cb;
if (!success) {
cb = tcp->write_cb;
@ -375,7 +375,7 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
gpr_slice_buffer *buf,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_op_status status;

@ -84,8 +84,8 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure destroyed_closure;
grpc_closure read_closure;
grpc_closure destroyed_closure;
} server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {

@ -82,8 +82,8 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
grpc_iomgr_closure *read_cb;
grpc_iomgr_closure *write_cb;
grpc_closure *read_cb;
grpc_closure *write_cb;
gpr_slice read_slice;
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
@ -169,7 +169,7 @@ static int on_read(grpc_tcp *tcp, int success) {
static void on_read_cb(void *tcpp, int from_iocp) {
grpc_tcp *tcp = tcpp;
grpc_iomgr_closure *cb = tcp->read_cb;
grpc_closure *cb = tcp->read_cb;
int success = on_read(tcp, from_iocp);
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
@ -180,7 +180,7 @@ static void on_read_cb(void *tcpp, int from_iocp) {
static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
gpr_slice_buffer *read_slices,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@ -241,7 +241,7 @@ static void on_write(void *tcpp, int success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_iomgr_closure *cb;
grpc_closure *cb;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
@ -269,7 +269,7 @@ static void on_write(void *tcpp, int success) {
/* Initiates a write. */
static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;

@ -79,8 +79,8 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure destroyed_closure;
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
} server_port;

@ -52,7 +52,7 @@ typedef struct grpc_workqueue grpc_workqueue;
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(void);
void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously);
void grpc_workqueue_flush(grpc_workqueue *workqueue);
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@ -76,7 +76,7 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset *pollset);
/** Add a work item to a workqueue */
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success);
#endif

@ -41,7 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
@ -52,71 +52,19 @@ grpc_workqueue *grpc_workqueue_create(void) {
grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&workqueue->refs, 1);
gpr_mu_init(&workqueue->mu);
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
workqueue->call_list.head = workqueue->call_list.tail = NULL;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
workqueue->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
return workqueue;
}
static void shutdown_thread(void *arg) {
grpc_iomgr_closure *todo = arg;
while (todo) {
grpc_iomgr_closure *next = todo->next;
todo->cb(todo->cb_arg, todo->success);
todo = next;
}
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static size_t count_waiting(grpc_workqueue *workqueue) {
size_t i = 0;
grpc_iomgr_closure *c;
for (c = workqueue->head.next; c; c = c->next) {
i++;
}
return i;
}
#endif
void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) {
grpc_iomgr_closure *todo;
gpr_thd_id thd;
gpr_mu_lock(&workqueue->mu);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
if (workqueue->head.next) {
gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue,
count_waiting(workqueue),
asynchronously ? "asynchronously" : "synchronously");
}
#endif
todo = workqueue->head.next;
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
gpr_mu_unlock(&workqueue->mu);
if (todo != NULL) {
if (asynchronously) {
gpr_thd_new(&thd, shutdown_thread, todo, NULL);
} else {
while (todo) {
grpc_iomgr_closure *next = todo->next;
todo->cb(todo->cb_arg, todo->success);
todo = next;
}
}
}
}
static void workqueue_destroy(grpc_workqueue *workqueue) {
GPR_ASSERT(workqueue->tail == &workqueue->head);
GPR_ASSERT(grpc_call_list_empty(workqueue->call_list));
grpc_fd_shutdown(workqueue->wakeup_read_fd);
}
@ -151,9 +99,16 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd);
}
void grpc_workqueue_flush(grpc_workqueue *workqueue) {
grpc_call_list todo = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&workqueue->mu);
GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
gpr_mu_unlock(&workqueue->mu);
grpc_call_list_run(todo);
}
static void on_readable(void *arg, int success) {
grpc_workqueue *workqueue = arg;
grpc_iomgr_closure *todo;
if (!success) {
gpr_mu_destroy(&workqueue->mu);
@ -162,42 +117,26 @@ static void on_readable(void *arg, int success) {
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy");
gpr_free(workqueue);
return;
} else {
grpc_call_list todo = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&workqueue->mu);
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
count_waiting(workqueue));
#endif
todo = workqueue->head.next;
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
while (todo) {
grpc_iomgr_closure *next = todo->next;
todo->cb(todo->cb_arg, todo->success);
todo = next;
}
grpc_call_list_run(todo);
}
}
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success) {
closure->success = success;
closure->next = NULL;
gpr_mu_lock(&workqueue->mu);
if (workqueue->tail == &workqueue->head) {
if (grpc_call_list_empty(workqueue->call_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
workqueue->tail->next = closure;
workqueue->tail = closure;
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
count_waiting(workqueue));
#endif
grpc_call_list_add(&workqueue->call_list, closure, success);
gpr_mu_unlock(&workqueue->mu);
}

@ -40,13 +40,12 @@ struct grpc_workqueue {
gpr_refcount refs;
gpr_mu mu;
grpc_iomgr_closure head;
grpc_iomgr_closure *tail;
grpc_call_list call_list;
grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd;
grpc_iomgr_closure read_closure;
grpc_closure read_closure;
};
#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */

@ -49,9 +49,9 @@ typedef struct {
struct tsi_frame_protector *protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
grpc_iomgr_closure *read_cb;
grpc_iomgr_closure *write_cb;
grpc_iomgr_closure on_read;
grpc_closure *read_cb;
grpc_closure *write_cb;
grpc_closure on_read;
gpr_slice_buffer *read_buffer;
gpr_slice_buffer source_buffer;
/* saved handshaker leftover data to unprotect. */
@ -214,7 +214,7 @@ static void on_read_cb(void *user_data, int success) {
static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
int immediate_read_success = -1;
ep->read_cb = cb;
@ -257,7 +257,7 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
gpr_slice_buffer *slices,
grpc_iomgr_closure *cb) {
grpc_closure *cb) {
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@ -386,7 +386,7 @@ grpc_endpoint *grpc_secure_endpoint_create(
gpr_slice_buffer_init(&ep->output_buffer);
gpr_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
grpc_closure_init(&ep->on_read, on_read_cb, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;

@ -54,8 +54,8 @@ typedef struct {
gpr_slice_buffer outgoing;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
grpc_iomgr_closure on_handshake_data_sent_to_peer;
grpc_iomgr_closure on_handshake_data_received_from_peer;
grpc_closure on_handshake_data_sent_to_peer;
grpc_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup;
static void on_handshake_data_received_from_peer(void *setup, int success);
@ -301,10 +301,10 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
on_handshake_data_sent_to_peer, s);
grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
on_handshake_data_received_from_peer, s);
grpc_closure_init(&s->on_handshake_data_sent_to_peer,
on_handshake_data_sent_to_peer, s);
grpc_closure_init(&s->on_handshake_data_received_from_peer,
on_handshake_data_received_from_peer, s);
gpr_slice_buffer_init(&s->left_overs);
gpr_slice_buffer_init(&s->outgoing);
gpr_slice_buffer_init(&s->incoming);

@ -44,11 +44,11 @@ typedef struct call_data {
gpr_uint8 got_client_metadata;
grpc_stream_op_buffer *recv_ops;
/* Closure to call when finished with the auth_on_recv hook. */
grpc_iomgr_closure *on_done_recv;
grpc_closure *on_done_recv;
/* Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member after
handling it. */
grpc_iomgr_closure auth_on_recv;
grpc_closure auth_on_recv;
grpc_transport_stream_op transport_op;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
@ -202,7 +202,7 @@ static void init_call_elem(grpc_call_element *elem,
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_iomgr_closure_init(&calld->auth_on_recv, auth_on_recv, elem);
grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem);
GPR_ASSERT(initial_op && initial_op->context != NULL &&
initial_op->context[GRPC_CONTEXT_SECURITY].value == NULL);

@ -256,10 +256,10 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
grpc_closure destroy_closure;
grpc_closure on_done_recv;
grpc_closure on_done_send;
grpc_closure on_done_bind;
/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
@ -333,9 +333,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
grpc_closure_init(&call->on_done_recv, call_on_done_recv, call);
grpc_closure_init(&call->on_done_send, call_on_done_send, call);
grpc_closure_init(&call->on_done_bind, finished_loose_op, call);
/* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
@ -1353,7 +1353,7 @@ static void finished_loose_op(void *call, int success_ignored) {
typedef struct {
grpc_call *call;
grpc_iomgr_closure closure;
grpc_closure closure;
} finished_loose_op_allocated_args;
static void finished_loose_op_allocated(void *alloc, int success) {
@ -1373,8 +1373,7 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
} else {
finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
args->call = call;
grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated,
args);
grpc_closure_init(&args->closure, finished_loose_op_allocated, args);
op->on_consumed = &args->closure;
}
}

@ -45,7 +45,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
/* forward through to the underlying client channel */
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_connectivity_state state;
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
}
state = grpc_client_channel_check_connectivity_state(
client_channel_elem, try_to_connect, &call_list);
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
return state;
}
@ -71,7 +71,7 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
grpc_iomgr_closure on_complete;
grpc_closure on_complete;
grpc_alarm alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
@ -158,13 +158,13 @@ void grpc_channel_watch_connectivity_state(
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
state_watcher *w = gpr_malloc(sizeof(*w));
grpc_cq_begin_op(cq);
gpr_mu_init(&w->mu);
grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
grpc_closure_init(&w->on_complete, watch_complete, w);
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
@ -181,7 +181,7 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1);
grpc_call_list_add(&call_list, &w->on_complete, 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
@ -190,5 +190,5 @@ void grpc_channel_watch_connectivity_state(
&w->on_complete, &call_list);
}
grpc_iomgr_call_list_run(call_list);
grpc_call_list_run(call_list);
}

@ -52,7 +52,7 @@ typedef struct {
grpc_connector base;
gpr_refcount refs;
grpc_iomgr_closure *notify;
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
} connector;
@ -71,7 +71,7 @@ static void connector_unref(grpc_connector *con) {
static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
grpc_closure *notify;
if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
@ -94,7 +94,7 @@ static void connector_shutdown(grpc_connector *con) {}
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
grpc_iomgr_closure *notify) {
grpc_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);

@ -58,7 +58,7 @@ typedef struct {
grpc_channel_security_connector *security_connector;
grpc_iomgr_closure *notify;
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
@ -83,7 +83,7 @@ static void on_secure_transport_setup_done(void *arg,
grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_iomgr_closure *notify;
grpc_closure *notify;
gpr_mu_lock(&c->mu);
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
@ -114,7 +114,7 @@ static void on_secure_transport_setup_done(void *arg,
static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
grpc_closure *notify;
if (tcp != NULL) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting_endpoint == NULL);
@ -145,7 +145,7 @@ static void connector_shutdown(grpc_connector *con) {
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
grpc_iomgr_closure *notify) {
grpc_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);

@ -113,8 +113,8 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
grpc_iomgr_closure finish_destroy_channel_closure;
grpc_iomgr_closure channel_connectivity_changed;
grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
};
typedef struct shutdown_tag {
@ -153,10 +153,10 @@ struct call_data {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
grpc_iomgr_closure *on_done_recv;
grpc_closure *on_done_recv;
grpc_iomgr_closure server_on_recv;
grpc_iomgr_closure kill_zombie_closure;
grpc_closure server_on_recv;
grpc_closure kill_zombie_closure;
call_data *pending_next;
};
@ -254,7 +254,7 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
}
struct shutdown_cleanup_args {
grpc_iomgr_closure closure;
grpc_closure closure;
gpr_slice slice;
};
@ -277,7 +277,7 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
op.goaway_message = &sc->slice;
op.goaway_status = GRPC_STATUS_OK;
op.disconnect = send_disconnect;
grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
@ -323,7 +323,7 @@ static void request_matcher_zombify_all_pending_calls(
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
@ -420,7 +420,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
return;
}
@ -603,7 +603,7 @@ static void server_on_recv(void *ptr, int success) {
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_workqueue_push(chand->server->workqueue,
&calld->kill_zombie_closure, 1);
} else {
@ -615,7 +615,7 @@ static void server_on_recv(void *ptr, int success) {
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_workqueue_push(chand->server->workqueue,
&calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) {
@ -689,7 +689,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
grpc_closure_init(&calld->server_on_recv, server_on_recv, elem);
server_ref(chand->server);
@ -729,8 +729,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand);
grpc_closure_init(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@ -1077,7 +1077,7 @@ void grpc_server_destroy(grpc_server *server) {
gpr_mu_unlock(&server->mu_global);
grpc_workqueue_flush(server->workqueue, 0);
grpc_workqueue_flush(server->workqueue);
server_unref(server);
}
@ -1132,7 +1132,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
gpr_mu_lock(&calld->mu_state);
if (calld->state == ZOMBIED) {
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);

@ -155,7 +155,7 @@ typedef enum {
/* Outstanding ping request data */
typedef struct grpc_chttp2_outstanding_ping {
gpr_uint8 id[8];
grpc_iomgr_closure *on_recv;
grpc_closure *on_recv;
struct grpc_chttp2_outstanding_ping *next;
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
@ -164,7 +164,7 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
grpc_iomgr_call_list run_at_unlock;
grpc_call_list run_at_unlock;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
@ -214,7 +214,7 @@ typedef struct {
/** is this a client? */
gpr_uint8 is_client;
/** callback for when writing is done */
grpc_iomgr_closure done_cb;
grpc_closure done_cb;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
@ -332,9 +332,9 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream_map new_stream_map;
/** closure to execute writing */
grpc_iomgr_closure writing_action;
grpc_closure writing_action;
/** closure to finish reading from the endpoint */
grpc_iomgr_closure recv_data;
grpc_closure recv_data;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@ -359,8 +359,8 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
grpc_iomgr_closure *send_done_closure;
grpc_iomgr_closure *recv_done_closure;
grpc_closure *send_done_closure;
grpc_closure *recv_done_closure;
/** window available for us to send to peer */
gpr_int64 outgoing_window;

@ -238,8 +238,8 @@ void grpc_chttp2_cleanup_writing(
stream_global->outgoing_sopb->nops == 0) {
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL;
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 1);
grpc_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 1);
}
}
stream_global->writing_now = 0;

@ -250,15 +250,15 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->writing_action, writing_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
&t->writing);
grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
&t->writing);
grpc_closure_init(&t->recv_data, recv_data, t);
gpr_slice_buffer_init(&t->read_buffer);
if (is_client) {
@ -499,20 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT;
grpc_call_list run = GRPC_CALL_LIST_INIT;
unlock_check_read_write_state(t);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
grpc_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock);
GPR_SWAP(grpc_call_list, run, t->global.run_at_unlock);
gpr_mu_unlock(&t->mu);
grpc_iomgr_call_list_run(run);
grpc_call_list_run(run);
}
/*
@ -664,8 +664,8 @@ static void perform_stream_op_locked(
}
} else {
grpc_sopb_reset(op->send_ops);
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 0);
grpc_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 0);
}
}
@ -703,8 +703,7 @@ static void perform_stream_op_locked(
op->bind_pollset);
}
grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed,
1);
grpc_call_list_add(&transport_global->run_at_unlock, op->on_consumed, 1);
}
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
@ -717,8 +716,7 @@ static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
unlock(t);
}
static void send_ping_locked(grpc_chttp2_transport *t,
grpc_iomgr_closure *on_recv) {
static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
p->next = &t->global.pings;
p->prev = p->next->prev;
@ -741,7 +739,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
lock(t);
grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
grpc_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
if (op->on_connectivity_state_change) {
grpc_connectivity_state_notify_on_state_change(
@ -868,8 +866,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (stream_global->outgoing_sopb != NULL) {
grpc_sopb_reset(stream_global->outgoing_sopb);
stream_global->outgoing_sopb = NULL;
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 1);
grpc_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 1);
}
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
@ -919,8 +917,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
&stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state;
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->recv_done_closure, 1);
grpc_call_list_add(&transport_global->run_at_unlock,
stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL;

@ -91,7 +91,7 @@ grpc_connectivity_state grpc_connectivity_state_check(
int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) {
grpc_closure *notify, grpc_call_list *call_list) {
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p",
tracker->name, grpc_connectivity_state_name(*current),
@ -99,7 +99,7 @@ int grpc_connectivity_state_notify_on_state_change(
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
grpc_iomgr_call_list_add(call_list, notify, 1);
grpc_call_list_add(call_list, notify, 1);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@ -113,7 +113,7 @@ int grpc_connectivity_state_notify_on_state_change(
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason,
grpc_iomgr_call_list *call_list) {
grpc_call_list *call_list) {
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
@ -128,7 +128,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
grpc_iomgr_call_list_add(call_list, w->notify, 1);
grpc_call_list_add(call_list, w->notify, 1);
gpr_free(w);
}
}

@ -42,7 +42,7 @@ typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */
struct grpc_connectivity_state_watcher *next;
/** closure to notify on change */
grpc_iomgr_closure *notify;
grpc_closure *notify;
/** the current state as believed by the watcher */
grpc_connectivity_state *current;
} grpc_connectivity_state_watcher;
@ -67,8 +67,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
* external lock */
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason,
grpc_iomgr_call_list *call_list);
const char *reason, grpc_call_list *call_list);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
@ -76,6 +75,6 @@ grpc_connectivity_state grpc_connectivity_state_check(
/** Return 1 if the channel should start connecting, 0 otherwise */
int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list);
grpc_closure *notify, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */

@ -101,8 +101,8 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
typedef struct {
gpr_slice message;
grpc_iomgr_closure *then_call;
grpc_iomgr_closure closure;
grpc_closure *then_call;
grpc_closure closure;
} close_message_data;
static void free_message(void *p, int iomgr_success) {
@ -130,7 +130,7 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
cmd = gpr_malloc(sizeof(*cmd));
cmd->message = *optional_message;
cmd->then_call = op->on_consumed;
grpc_iomgr_closure_init(&cmd->closure, free_message, cmd);
grpc_closure_init(&cmd->closure, free_message, cmd);
op->on_consumed = &cmd->closure;
op->optional_close_message = &cmd->message;
}

@ -64,11 +64,11 @@ typedef enum grpc_stream_state {
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
grpc_iomgr_closure *on_consumed;
grpc_closure *on_consumed;
grpc_stream_op_buffer *send_ops;
int is_last_send;
grpc_iomgr_closure *on_done_send;
grpc_closure *on_done_send;
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
@ -76,7 +76,7 @@ typedef struct grpc_transport_stream_op {
These bytes will be eventually used to replenish per-stream flow control
windows. */
size_t max_recv_bytes;
grpc_iomgr_closure *on_done_recv;
grpc_closure *on_done_recv;
grpc_pollset *bind_pollset;
@ -95,9 +95,9 @@ typedef struct grpc_transport_stream_op {
/** Transport op: a set of operations to perform on a transport as a whole */
typedef struct grpc_transport_op {
/** called when processing of this op is done */
grpc_iomgr_closure *on_consumed;
grpc_closure *on_consumed;
/** connectivity monitoring */
grpc_iomgr_closure *on_connectivity_state_change;
grpc_closure *on_connectivity_state_change;
grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */
int disconnect;
@ -118,7 +118,7 @@ typedef struct grpc_transport_op {
/** add this transport to a pollset_set */
grpc_pollset_set *bind_pollset_set;
/** send a ping, call this back if not NULL */
grpc_iomgr_closure *send_ping;
grpc_closure *send_ping;
} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
@ -181,7 +181,7 @@ void grpc_transport_perform_op(grpc_transport *transport,
/* Send a ping on a transport
Calls cb with user data when a response is received. */
void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb);
void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb);
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,

@ -87,7 +87,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
gpr_slice slice =
gpr_slice_from_copied_buffer(client_payload, client_payload_length);
gpr_slice_buffer outgoing;
grpc_iomgr_closure done_write_closure;
grpc_closure done_write_closure;
grpc_workqueue *workqueue;
hex = gpr_dump(client_payload, client_payload_length,
@ -131,7 +131,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
gpr_slice_buffer_init(&outgoing);
gpr_slice_buffer_add(&outgoing, slice);
grpc_iomgr_closure_init(&done_write_closure, done_write, &a);
grpc_closure_init(&done_write_closure, done_write, &a);
/* Write data */
switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {

@ -171,7 +171,7 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
grpc_workqueue_flush(g_workqueue, 1);
grpc_workqueue_flush(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();

@ -157,7 +157,7 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
grpc_workqueue_flush(g_workqueue, 1);
grpc_workqueue_flush(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();

@ -157,7 +157,7 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
grpc_workqueue_flush(g_workqueue, 1);
grpc_workqueue_flush(g_workqueue);
GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
grpc_shutdown();

@ -122,8 +122,8 @@ struct read_and_write_test_state {
int write_done;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
grpc_iomgr_closure done_read;
grpc_iomgr_closure done_write;
grpc_closure done_read;
grpc_closure done_write;
};
static void read_and_write_test_read_handler(void *data, int success) {
@ -227,10 +227,9 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0;
state.current_read_data = 0;
state.current_write_data = 0;
grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
&state);
grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
&state);
grpc_closure_init(&state.done_read, read_and_write_test_read_handler, &state);
grpc_closure_init(&state.done_write, read_and_write_test_write_handler,
&state);
gpr_slice_buffer_init(&state.outgoing);
gpr_slice_buffer_init(&state.incoming);

@ -99,7 +99,7 @@ typedef struct {
grpc_fd *em_fd; /* listening fd */
ssize_t read_bytes_total; /* total number of received bytes */
int done; /* set to 1 when a server finishes serving */
grpc_iomgr_closure listen_closure;
grpc_closure listen_closure;
} server;
static void server_init(server *sv) {
@ -113,7 +113,7 @@ typedef struct {
server *sv; /* not owned by a single session */
grpc_fd *em_fd; /* fd to read upload bytes */
char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
grpc_iomgr_closure session_read_closure;
grpc_closure session_read_closure;
} session;
/* Called when an upload session can be safely shutdown.
@ -275,7 +275,7 @@ typedef struct {
int client_write_cnt;
int done; /* set to 1 when a client finishes sending */
grpc_iomgr_closure write_closure;
grpc_closure write_closure;
} client;
static void client_init(client *cl) {
@ -422,8 +422,8 @@ static void test_grpc_fd_change(void) {
int sv[2];
char data;
ssize_t result;
grpc_iomgr_closure first_closure;
grpc_iomgr_closure second_closure;
grpc_closure first_closure;
grpc_closure second_closure;
first_closure.cb = first_read_callback;
first_closure.cb_arg = &a;

@ -120,7 +120,7 @@ struct read_socket_state {
size_t read_bytes;
size_t target_read_bytes;
gpr_slice_buffer incoming;
grpc_iomgr_closure read_cb;
grpc_closure read_cb;
};
static size_t count_slices(gpr_slice *slices, size_t nslices,
@ -196,7 +196,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
gpr_slice_buffer_init(&state.incoming);
grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
grpc_closure_init(&state.read_cb, read_cb, &state);
switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
case GRPC_ENDPOINT_DONE:
@ -246,7 +246,7 @@ static void large_read_test(size_t slice_size) {
state.read_bytes = 0;
state.target_read_bytes = (size_t)written_bytes;
gpr_slice_buffer_init(&state.incoming);
grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
grpc_closure_init(&state.read_cb, read_cb, &state);
switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
case GRPC_ENDPOINT_DONE:
@ -377,7 +377,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
gpr_slice *slices;
gpr_uint8 current_data = 0;
gpr_slice_buffer outgoing;
grpc_iomgr_closure write_done_closure;
grpc_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@ -396,7 +396,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
gpr_slice_buffer_init(&outgoing);
gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
grpc_closure_init(&write_done_closure, write_done, &state);
switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
case GRPC_ENDPOINT_DONE:

@ -49,12 +49,12 @@ static void must_succeed(void *p, int success) {
}
static void test_add_closure(void) {
grpc_iomgr_closure c;
grpc_closure c;
int done = 0;
grpc_workqueue *wq = grpc_workqueue_create();
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
grpc_pollset_worker worker;
grpc_iomgr_closure_init(&c, must_succeed, &done);
grpc_closure_init(&c, must_succeed, &done);
grpc_workqueue_push(wq, &c, 1);
grpc_workqueue_add_to_pollset(wq, &g_pollset);

Loading…
Cancel
Save