diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 726196e9968..711e1054647 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -157,7 +157,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem, channel_data *chand = elem->channel_data; if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send(op->send_user_data, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0); } if (op->recv_ops) { char status[GPR_LTOA_MIN_BUFSIZE]; @@ -176,10 +176,10 @@ static void handle_op_after_cancellation(grpc_call_element *elem, mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv(op->recv_user_data, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } @@ -266,17 +266,15 @@ static void cc_start_transport_op(grpc_call_element *elem, calld->s.waiting_op.send_ops = op->send_ops; calld->s.waiting_op.is_last_send = op->is_last_send; calld->s.waiting_op.on_done_send = op->on_done_send; - calld->s.waiting_op.send_user_data = op->send_user_data; } if (op->recv_ops) { calld->s.waiting_op.recv_ops = op->recv_ops; calld->s.waiting_op.recv_state = op->recv_state; calld->s.waiting_op.on_done_recv = op->on_done_recv; - calld->s.waiting_op.recv_user_data = op->recv_user_data; } gpr_mu_unlock(&chand->mu); if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } break; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 9805f325a64..62a7a1e7d9f 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -43,8 +43,9 @@ typedef struct call_data { int got_initial_metadata; grpc_stream_op_buffer *recv_ops; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; + + grpc_iomgr_closure hc_on_recv; } call_data; typedef struct channel_data { @@ -84,7 +85,7 @@ static void hc_on_recv(void *user_data, int success) { grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); } } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -117,9 +118,7 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { /* substitute our callback for the higher callback */ calld->recv_ops = op->recv_ops; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = hc_on_recv; - op->recv_user_data = elem; + op->on_done_recv = &calld->hc_on_recv; } } @@ -154,6 +153,8 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; + calld->on_done_recv = NULL; + grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem); if (initial_op) hc_mutate_op(elem, initial_op); } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 11a53b1e70c..e5ce7a5dd87 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -47,8 +47,8 @@ typedef struct call_data { grpc_linked_mdelem status; grpc_stream_op_buffer *recv_ops; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; + grpc_iomgr_closure hs_on_recv; } call_data; typedef struct channel_data { @@ -174,7 +174,7 @@ static void hs_on_recv(void *user_data, int success) { } } } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -200,9 +200,7 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { /* substitute our callback for the higher callback */ calld->recv_ops = op->recv_ops; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = hs_on_recv; - op->recv_user_data = elem; + op->on_done_recv = &calld->hs_on_recv; } } @@ -238,6 +236,7 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; /* initialize members */ memset(calld, 0, sizeof(*calld)); + grpc_iomgr_closure_init(&calld->hs_on_recv, hs_on_recv, elem); if (initial_op) hs_mutate_op(elem, initial_op); } diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index a10e481e481..265c817db1d 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -73,4 +73,6 @@ void grpc_iomgr_shutdown(void); * Can be called from within a callback or from anywhere else */ void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); + #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 641c1cd4359..872705e996d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -237,6 +237,9 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; grpc_iomgr_closure destroy_closure; + grpc_iomgr_closure on_done_recv; + grpc_iomgr_closure on_done_send; + grpc_iomgr_closure on_done_bind; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -255,6 +258,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description); +static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); @@ -298,6 +302,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); gpr_slice_buffer_init(&call->incoming_message); + grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call); + grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call); + grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call); /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ gpr_ref_init(&call->internal_refcount, 2); /* server hack: start reads immediately so we can get initial metadata. @@ -306,8 +313,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, memset(&initial_op, 0, sizeof(initial_op)); initial_op.recv_ops = &call->recv_ops; initial_op.recv_state = &call->recv_state; - initial_op.on_done_recv = call_on_done_recv; - initial_op.recv_user_data = call; + initial_op.on_done_recv = &call->on_done_recv; initial_op.context = call->context; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); @@ -460,8 +466,7 @@ static void unlock(grpc_call *call) { if (!call->receiving && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; - op.on_done_recv = call_on_done_recv; - op.recv_user_data = call; + op.on_done_recv = &call->on_done_recv; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; @@ -929,8 +934,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { break; } if (op->send_ops) { - op->on_done_send = call_on_done_send; - op->send_user_data = call; + op->on_done_send = &call->on_done_send; } return op->send_ops != NULL; } @@ -1105,14 +1109,31 @@ static void finished_loose_op(void *call, int success_ignored) { GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); } +typedef struct { + grpc_call *call; + grpc_iomgr_closure closure; +} finished_loose_op_allocated_args; + +static void finished_loose_op_allocated(void *alloc, int success) { + finished_loose_op_allocated_args *args = alloc; + finished_loose_op(args->call, success); + gpr_free(args); +} + static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { GRPC_CALL_INTERNAL_REF(call, "loose-op"); - op->on_consumed = finished_loose_op; - op->on_consumed_user_data = call; + if (op->bind_pollset) { + op->on_consumed = &call->on_done_bind; + } else { + finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); + args->call = call; + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + op->on_consumed = &args->closure; + } } elem = CALL_ELEM_FROM_CALL(call, 0); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b667128aef2..85e1ab5554c 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send(op->send_user_data, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0); } if (op->recv_ops) { char tmp[GPR_LTOA_MIN_BUFSIZE]; @@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem, mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv(op->recv_user_data, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3671efe0d0d..a76a6c78120 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -183,9 +183,9 @@ struct call_data { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; + grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; call_data **root[CALL_LIST_COUNT]; @@ -503,7 +503,7 @@ static void server_on_recv(void *ptr, int success) { break; } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -514,9 +514,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { calld->recv_ops = op->recv_ops; calld->recv_state = op->recv_state; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = server_on_recv; - op->recv_user_data = elem; + op->on_done_recv = &calld->server_on_recv; } } @@ -612,6 +610,8 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); + grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); + gpr_mu_lock(&chand->server->mu); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); chand->num_calls++; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c4fa13c86c8..f354c9441a1 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -207,18 +207,6 @@ typedef struct { gpr_slice debug; } pending_goaway; -typedef struct { - void (*cb)(void *user_data, int success); - void *user_data; - int success; -} op_closure; - -typedef struct { - op_closure *callbacks; - size_t count; - size_t capacity; -} op_closure_array; - struct transport { grpc_transport base; /* must be first */ grpc_endpoint *ep; @@ -237,10 +225,6 @@ struct transport { gpr_uint8 closed; error_state error_state; - /* queued callbacks */ - op_closure_array pending_callbacks; - op_closure_array executing_callbacks; - /* stream indexing */ gpr_uint32 next_stream_id; gpr_uint32 last_incoming_stream_id; @@ -266,9 +250,6 @@ struct transport { gpr_uint32 incoming_frame_size; gpr_uint32 incoming_stream_id; - /* hpack encoding */ - grpc_chttp2_hpack_compressor hpack_compressor; - /* various parsers */ grpc_chttp2_hpack_parser hpack_parser; /* simple one shot parsers */ @@ -313,6 +294,8 @@ struct transport { struct { /** data to write next write */ gpr_slice_buffer qbuf; + /* queued callbacks */ + grpc_iomgr_closure *pending_closures; } global; struct { @@ -322,6 +305,8 @@ struct transport { grpc_iomgr_closure action; /** data to write now */ gpr_slice_buffer outbuf; + /* hpack encoding */ + grpc_chttp2_hpack_compressor hpack_compressor; } writing; struct { @@ -334,18 +319,26 @@ struct transport { struct { /** is a thread currently performing channel callbacks */ gpr_uint8 executing; + /** transport channel-level callback */ const grpc_transport_callbacks *cb; + /** user data for cb calls */ void *cb_user_data; + /** closure for notifying transport closure */ + grpc_iomgr_closure notify_closed; } channel_callback; }; struct stream { struct { - int unused; + grpc_iomgr_closure *send_done_closure; + grpc_iomgr_closure *recv_done_closure; } global; struct { - int unused; + /* sops that have passed flow control to be written */ + grpc_stream_op_buffer sopb; + /* how strongly should we indicate closure with the next write */ + send_closed send_closed; } writing; struct { @@ -361,13 +354,9 @@ struct stream { 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ write_state write_state; - send_closed send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; - op_closure send_done_closure; - op_closure recv_done_closure; - stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; @@ -383,8 +372,6 @@ struct stream { grpc_stream_op_buffer *incoming_sopb; grpc_stream_state *publish_state; grpc_stream_state published_state; - /* sops that have passed flow control to be written */ - grpc_stream_op_buffer writing_sopb; grpc_chttp2_data_parser parser; @@ -392,33 +379,21 @@ struct stream { grpc_stream_op_buffer callback_sopb; }; -#define MAX_POST_ACTIONS 8 - -typedef struct { - size_t num_post_actions; - grpc_iomgr_closure *post_actions[MAX_POST_ACTIONS]; -} unlock_ctx; - static const grpc_transport_vtable vtable; static void push_setting(transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); -static int prepare_callbacks(transport *t); -static void run_callbacks(transport *t); -static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb); - -static int prepare_write(transport *t); -static void perform_write(transport *t, grpc_endpoint *ep); - static void lock(transport *t); static void unlock(transport *t); -static void unlock_check_writes(transport* t, unlock_ctx *uctx); - static void unlock_check_cancellations(transport* t, unlock_ctx *uctx); - static void unlock_check_parser(transport* t, unlock_ctx *uctx); - static void unlock_check_op_callbacks(transport* t, unlock_ctx *uctx); - static void unlock_check_channel_callbacks(transport* t, unlock_ctx *uctx); +static void unlock_check_writes(transport* t); + static void unlock_check_cancellations(transport* t); + static void unlock_check_parser(transport* t); + static void unlock_check_channel_callbacks(transport* t); + +static void writing_action(void *t, int iomgr_success_ignored); +static void notify_closed(void *t, int iomgr_success_ignored); static void drop_connection(transport *t); static void end_all_the_calls(transport *t); @@ -435,7 +410,6 @@ static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst); -static void finalize_cancellations(transport *t); static stream *lookup_stream(transport *t, gpr_uint32 id); static void remove_from_stream_map(transport *t, stream *s); static void maybe_start_some_streams(transport *t); @@ -445,10 +419,9 @@ static void become_skip_parser(transport *t); static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error); -static void schedule_cb(transport *t, op_closure closure, int success); +static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success); static void maybe_finish_read(transport *t, stream *s, int is_parser); static void maybe_join_window_updates(transport *t, stream *s); -static void finish_reads(transport *t); static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); static void add_metadata_batch(transport *t, stream *s); @@ -471,10 +444,12 @@ static void destruct_transport(transport *t) { GPR_ASSERT(t->ep == NULL); gpr_slice_buffer_destroy(&t->global.qbuf); + gpr_slice_buffer_destroy(&t->writing.outbuf); + grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); + gpr_slice_buffer_destroy(&t->parsing.qbuf); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); - grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); grpc_mdstr_unref(t->str_grpc_timeout); @@ -499,9 +474,6 @@ static void destruct_transport(transport *t) { } gpr_free(t->pings); - gpr_free(t->pending_callbacks.callbacks); - gpr_free(t->executing_callbacks.callbacks); - for (i = 0; i < t->num_pending_goaways; i++) { gpr_slice_unref(t->pending_goaways[i].debug); } @@ -552,11 +524,13 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; t->ping_counter = gpr_now().tv_nsec; - grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); grpc_chttp2_goaway_parser_init(&t->goaway_parser); gpr_slice_buffer_init(&t->global.qbuf); gpr_slice_buffer_init(&t->writing.outbuf); + grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); + grpc_iomgr_closure_init(&t->writing.action, writing_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); + grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t); grpc_sopb_init(&t->nuke_later_sopb); grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context); if (is_client) { @@ -664,7 +638,7 @@ static void destroy_transport(grpc_transport *gt) { It's shutdown path, so I don't believe an extra lock pair is going to be problematic for performance. */ lock(t); - GPR_ASSERT(!t->channel_callback.cb); + GPR_ASSERT(t->error_state == ERROR_STATE_NOTIFIED); unlock(t); unref_transport(t); @@ -722,7 +696,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, } s->incoming_deadline = gpr_inf_future; - grpc_sopb_init(&s->writing_sopb); + grpc_sopb_init(&s->writing.sopb); grpc_sopb_init(&s->callback_sopb); grpc_chttp2_data_parser_init(&s->parser); @@ -755,7 +729,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(s->outgoing_sopb == NULL); GPR_ASSERT(s->incoming_sopb == NULL); - grpc_sopb_destroy(&s->writing_sopb); + grpc_sopb_destroy(&s->writing.sopb); grpc_sopb_destroy(&s->callback_sopb); grpc_chttp2_data_parser_destroy(&s->parser); for (i = 0; i < s->incoming_metadata_count; i++) { @@ -771,9 +745,11 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { * LIST MANAGEMENT */ +#if 0 static int stream_list_empty(transport *t, stream_list_id id) { return t->lists[id].head == NULL; } +#endif static stream *stream_list_remove_head(transport *t, stream_list_id id) { stream *s = t->lists[id].head; @@ -852,25 +828,24 @@ static void remove_from_stream_map(transport *t, stream *s) { static void lock(transport *t) { gpr_mu_lock(&t->mu); } static void unlock(transport *t) { - unlock_ctx uctx; - size_t i; + grpc_iomgr_closure *run_closures; - memset(&uctx, 0, sizeof(uctx)); + unlock_check_writes(t); + unlock_check_cancellations(t); + unlock_check_parser(t); + unlock_check_channel_callbacks(t); - unlock_check_writes(t, &uctx); - unlock_check_cancellations(t, &uctx); - unlock_check_parser(t, &uctx); - unlock_check_op_callbacks(t, &uctx); - unlock_check_channel_callbacks(t, &uctx); + run_closures = t->global.pending_closures; + t->global.pending_closures = NULL; gpr_mu_unlock(&t->mu); - for (i = 0; i < uctx.num_post_actions; i++) { - grpc_iomgr_closure* closure = uctx.post_actions[i]; - closure->cb(closure->cb_arg, 1); + while (run_closures) { + grpc_iomgr_closure *next = run_closures->next; + run_closures->cb(run_closures->cb_arg, run_closures->success); + run_closures = next; } - #if 0 int start_write = 0; int perform_callbacks = 0; @@ -994,7 +969,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, } } -static void unlock_check_writes(transport *t, unlock_ctx *uctx) { +static void unlock_check_writes(transport *t) { stream *s; gpr_uint32 window_delta; @@ -1023,7 +998,7 @@ static void unlock_check_writes(transport *t, unlock_ctx *uctx) { s->outgoing_window > 0) { window_delta = grpc_chttp2_preencode( s->outgoing_sopb->ops, &s->outgoing_sopb->nops, - GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); + GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb); FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); t->outgoing_window -= window_delta; @@ -1032,19 +1007,19 @@ static void unlock_check_writes(transport *t, unlock_ctx *uctx) { if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) { if (!t->is_client && !s->read_closed) { - s->send_closed = SEND_CLOSED_WITH_RST_STREAM; + s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM; } else { - s->send_closed = SEND_CLOSED; + s->writing.send_closed = SEND_CLOSED; } } - if (s->writing_sopb.nops > 0 || s->send_closed) { + if (s->writing.sopb.nops > 0 || s->writing.send_closed) { stream_list_join(t, s, WRITING); } /* we should either exhaust window or have no ops left, but not both */ if (s->outgoing_sopb->nops == 0) { s->outgoing_sopb = NULL; - schedule_cb(t, s->send_done_closure, 1); + schedule_cb(t, s->global.send_done_closure, 1); } else if (s->outgoing_window) { stream_list_add_tail(t, s, WRITABLE); } @@ -1075,30 +1050,31 @@ static void unlock_check_writes(transport *t, unlock_ctx *uctx) { } if (t->writing.outbuf.length > 0) { - uctx->post_actions[uctx->num_post_actions++] = &t->writing.action; t->writing.executing = 1; + ref_transport(t); + schedule_cb(t, &t->writing.action, 1); } } -static void finalize_outbuf(transport *t) { +static void writing_finalize_outbuf(transport *t) { stream *s; while ((s = stream_list_remove_head(t, WRITING))) { - grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->send_closed != DONT_SEND_CLOSED, s->id, - &t->hpack_compressor, &t->outbuf); - s->writing_sopb.nops = 0; - if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) { - gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create( + grpc_chttp2_encode(s->writing.sopb.ops, s->writing.sopb.nops, + s->writing.send_closed != DONT_SEND_CLOSED, s->id, + &t->writing.hpack_compressor, &t->writing.outbuf); + s->writing.sopb.nops = 0; + if (s->writing.send_closed == SEND_CLOSED_WITH_RST_STREAM) { + gpr_slice_buffer_add(&t->writing.outbuf, grpc_chttp2_rst_stream_create( s->id, GRPC_CHTTP2_NO_ERROR)); } - if (s->send_closed != DONT_SEND_CLOSED) { + if (s->writing.send_closed != DONT_SEND_CLOSED) { stream_list_join(t, s, WRITTEN_CLOSED); } } } -static void finish_write_common(transport *t, int success) { +static void writing_finish(transport *t, int success) { stream *s; lock(t); @@ -1112,11 +1088,11 @@ static void finish_write_common(transport *t, int success) { } maybe_finish_read(t, s, 0); } - t->outbuf.count = 0; - t->outbuf.length = 0; + t->writing.outbuf.count = 0; + t->writing.outbuf.length = 0; /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ - t->writing = 0; + t->writing.executing = 0; if (t->destroying) { gpr_cv_signal(&t->cv); } @@ -1130,27 +1106,35 @@ static void finish_write_common(transport *t, int success) { unref_transport(t); } -static void finish_write(void *tp, grpc_endpoint_cb_status error) { +static void writing_finish_write_cb(void *tp, grpc_endpoint_cb_status error) { transport *t = tp; - finish_write_common(t, error == GRPC_ENDPOINT_CB_OK); + writing_finish(t, error == GRPC_ENDPOINT_CB_OK); } -static void perform_write(transport *t, grpc_endpoint *ep) { - finalize_outbuf(t); +static void writing_action(void *gt, int iomgr_success_ignored) { + transport *t = gt; - GPR_ASSERT(t->outbuf.count > 0); + writing_finalize_outbuf(t); - switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count, - finish_write, t)) { + GPR_ASSERT(t->writing.outbuf.count > 0); + + switch (grpc_endpoint_write(t->ep, t->writing.outbuf.slices, t->writing.outbuf.count, + writing_finish_write_cb, t)) { case GRPC_ENDPOINT_WRITE_DONE: - finish_write_common(t, 1); + writing_finish(t, 1); break; case GRPC_ENDPOINT_WRITE_ERROR: - finish_write_common(t, 0); + writing_finish(t, 0); break; case GRPC_ENDPOINT_WRITE_PENDING: break; } + + lock(t); + t->writing.executing = 0; + unlock(t); + + unref_transport(t); } static void add_goaway(transport *t, gpr_uint32 goaway_error, @@ -1168,7 +1152,7 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error, static void maybe_start_some_streams(transport *t) { /* start streams where we have free stream ids and free concurrency */ - while (!t->parsing && t->next_stream_id <= MAX_CLIENT_STREAM_ID && + while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID && grpc_chttp2_stream_map_size(&t->stream_map) < t->settings[PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { @@ -1216,8 +1200,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { if (op->send_ops) { GPR_ASSERT(s->outgoing_sopb == NULL); - s->send_done_closure.cb = op->on_done_send; - s->send_done_closure.user_data = op->send_user_data; + s->global.send_done_closure = op->on_done_send; if (!s->cancelled) { s->outgoing_sopb = op->send_ops; if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) { @@ -1234,15 +1217,14 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { } } else { schedule_nuke_sopb(t, op->send_ops); - schedule_cb(t, s->send_done_closure, 0); + schedule_cb(t, s->global.send_done_closure, 0); } } if (op->recv_ops) { GPR_ASSERT(s->incoming_sopb == NULL); GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED); - s->recv_done_closure.cb = op->on_done_recv; - s->recv_done_closure.user_data = op->recv_user_data; + s->global.recv_done_closure = op->on_done_recv; s->incoming_sopb = op->recv_ops; s->incoming_sopb->nops = 0; s->publish_state = op->recv_state; @@ -1257,10 +1239,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { } if (op->on_consumed) { - op_closure c; - c.cb = op->on_consumed; - c.user_data = op->on_consumed_user_data; - schedule_cb(t, c, 1); + schedule_cb(t, op->on_consumed, 1); } } @@ -1296,7 +1275,7 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), p->id[7] = t->ping_counter & 0xff; p->cb = cb; p->user_data = user_data; - gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); + gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); unlock(t); } @@ -1304,9 +1283,13 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), * INPUT PROCESSING */ -static void finalize_cancellations(transport *t) { +static void unlock_check_cancellations(transport *t) { stream *s; + if (t->writing.executing) { + return; + } + while ((s = stream_list_remove_head(t, CANCELLED))) { s->read_closed = 1; s->write_state = WRITE_STATE_SENT_CLOSE; @@ -1342,7 +1325,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, schedule_nuke_sopb(t, s->outgoing_sopb); s->outgoing_sopb = NULL; stream_list_remove(t, s, WRITABLE); - schedule_cb(t, s->send_done_closure, 0); + schedule_cb(t, s->global.send_done_closure, 0); } } if (s->cancelled) { @@ -1383,7 +1366,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, } if (!id) send_rst = 0; if (send_rst) { - gpr_slice_buffer_add(&t->qbuf, + gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_rst_stream_create(id, error_code)); } if (optional_message) { @@ -1434,7 +1417,7 @@ static void maybe_finish_read(transport *t, stream *s, int is_parser) { } static void maybe_join_window_updates(transport *t, stream *s) { - if (t->parsing) { + if (t->parsing.executing) { stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE); return; } @@ -1610,7 +1593,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) { } t->incoming_stream = NULL; /* if stream is accepted, we set incoming_stream in init_stream */ - t->cb->accept_stream(t->cb_user_data, &t->base, + t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base, (void *)(gpr_uintptr)t->incoming_stream_id); s = t->incoming_stream; if (!s) { @@ -1795,11 +1778,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { maybe_finish_read(t, t->incoming_stream, 1); } if (st.ack_settings) { - gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); + gpr_slice_buffer_add(&t->parsing.qbuf, grpc_chttp2_settings_ack_create()); } if (st.send_ping_ack) { gpr_slice_buffer_add( - &t->qbuf, + &t->parsing.qbuf, grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes)); } if (st.goaway) { @@ -2056,7 +2039,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, lock(t); drop_connection(t); t->reading = 0; - if (!t->writing && t->ep) { + if (!t->writing.executing && t->ep) { grpc_endpoint_destroy(t->ep); t->ep = NULL; unref_transport(t); /* safe as we still have a ref for read */ @@ -2065,16 +2048,16 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, unref_transport(t); break; case GRPC_ENDPOINT_CB_OK: - gpr_mu_lock(&t->mu); - GPR_ASSERT(!t->parsing); - t->parsing = 1; - gpr_mu_unlock(&t->mu); - if (t->cb) { + lock(t); + GPR_ASSERT(!t->parsing.executing); + t->parsing.executing = 1; + if (t->error_state == ERROR_STATE_NONE) { + gpr_mu_unlock(&t->mu); for (i = 0; i < nslices && process_read(t, slices[i]); i++) ; + gpr_mu_unlock(&t->mu); } - lock(t); - t->parsing = 0; + t->parsing.executing = 0; while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) { maybe_finish_read(t, s, 0); } @@ -2176,9 +2159,13 @@ static void patch_metadata_ops(stream *s) { } } -static void finish_reads(transport *t) { +static void unlock_check_parser(transport *t) { stream *s; + if (t->parsing.executing) { + return; + } + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { int publish = 0; GPR_ASSERT(s->incoming_sopb); @@ -2200,42 +2187,87 @@ static void finish_reads(transport *t) { patch_metadata_ops(s); } s->incoming_sopb = NULL; - schedule_cb(t, s->recv_done_closure, 1); + schedule_cb(t, s->global.recv_done_closure, 1); } } } -static void schedule_cb(transport *t, op_closure closure, int success) { - if (t->pending_callbacks.capacity == t->pending_callbacks.count) { - t->pending_callbacks.capacity = - GPR_MAX(t->pending_callbacks.capacity * 2, 8); - t->pending_callbacks.callbacks = - gpr_realloc(t->pending_callbacks.callbacks, - t->pending_callbacks.capacity * - sizeof(*t->pending_callbacks.callbacks)); +typedef struct { + transport *t; + pending_goaway *goaways; + size_t num_goaways; + grpc_iomgr_closure closure; +} notify_goaways_args; + +static void notify_goaways(void *p, int iomgr_success_ignored) { + size_t i; + notify_goaways_args *a = p; + transport *t = a->t; + + for (i = 0; i < a->num_goaways; i++) { + t->channel_callback.cb->goaway( + t->channel_callback.cb_user_data, + &t->base, + a->goaways[i].status, + a->goaways[i].debug); } - closure.success = success; - t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure; -} -static int prepare_callbacks(transport *t) { - op_closure_array temp = t->pending_callbacks; - t->pending_callbacks = t->executing_callbacks; - t->executing_callbacks = temp; - return t->executing_callbacks.count > 0; + gpr_free(a->goaways); + gpr_free(a); + + lock(t); + t->channel_callback.executing = 0; + unlock(t); + + unref_transport(t); } -static void run_callbacks(transport *t) { - size_t i; - for (i = 0; i < t->executing_callbacks.count; i++) { - op_closure c = t->executing_callbacks.callbacks[i]; - c.cb(c.user_data, c.success); +static void unlock_check_channel_callbacks(transport *t) { + if (t->channel_callback.executing) { + return; } - t->executing_callbacks.count = 0; + if (t->parsing.executing) { + return; + } + if (t->num_pending_goaways) { + notify_goaways_args *a = gpr_malloc(sizeof(*a)); + a->goaways = t->pending_goaways; + a->num_goaways = t->num_pending_goaways; + t->pending_goaways = NULL; + t->num_pending_goaways = 0; + t->cap_pending_goaways = 0; + t->channel_callback.executing = 1; + grpc_iomgr_closure_init(&a->closure, notify_goaways, a); + ref_transport(t); + schedule_cb(t, &a->closure, 1); + return; + } + if (t->writing.executing) { + return; + } + if (t->error_state == ERROR_STATE_SEEN) { + t->error_state = ERROR_STATE_NOTIFIED; + t->channel_callback.executing = 1; + ref_transport(t); + schedule_cb(t, &t->channel_callback.notify_closed, 1); + } +} + +static void notify_closed(void *gt, int iomgr_success_ignored) { + transport *t = gt; + t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base); + + lock(t); + t->channel_callback.executing = 0; + unlock(t); + + unref_transport(t); } -static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { - cb->closed(t->cb_user_data, &t->base); +static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success) { + closure->success = success; + closure->next = t->global.pending_closures; + t->global.pending_closures = closure; } /* diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index a9948cd4b2a..167970d992e 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -98,13 +98,13 @@ void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, void grpc_transport_op_finish_with_failure(grpc_transport_op *op) { if (op->send_ops) { - op->on_done_send(op->send_user_data, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0); } if (op->recv_ops) { - op->on_done_recv(op->recv_user_data, 0); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 0); } if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7f60fdc0374..9d43581a0ac 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -64,18 +64,15 @@ typedef enum grpc_stream_state { /* Transport op: a set of operations to perform on a transport */ typedef struct grpc_transport_op { - void (*on_consumed)(void *user_data, int success); - void *on_consumed_user_data; + grpc_iomgr_closure *on_consumed; grpc_stream_op_buffer *send_ops; int is_last_send; - void (*on_done_send)(void *user_data, int success); - void *send_user_data; + grpc_iomgr_closure *on_done_send; grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; grpc_pollset *bind_pollset;