diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index f774cce6901..fb72fd693a1 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -372,7 +372,7 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_combiner_execute(exec_ctx, t->combiner, grpc_closure_create(destroy_transport_locked, t), - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, false); } static void close_transport_locked(grpc_exec_ctx *exec_ctx, @@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->destroy_stream_arg = and_free_memory; grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s); grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, false); GPR_TIMER_END("destroy_stream", 0); } @@ -546,13 +546,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING; + gpr_log(GPR_DEBUG, "W:%s:%p: IDLE -> WRITING", t->is_client ? "CLIENT" : "SERVER", t); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action_begin_locked, - GRPC_ERROR_NONE, false); + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME; + gpr_log(GPR_DEBUG, "W:%s:%p: WRITING -> WRITING_MORE", t->is_client ? "CLIENT" : "SERVER", t); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: break; @@ -576,8 +578,11 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { + t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING; + gpr_log(GPR_DEBUG, "W:%s:%p: WRITING|WRITING_MORE -> WRITING", t->is_client ? "CLIENT" : "SERVER", t); grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL); } else { + gpr_log(GPR_DEBUG, "W:%s:%p: WRITING|WRITING_MORE -> IDLE", t->is_client ? "CLIENT" : "SERVER", t); t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); } @@ -596,7 +601,7 @@ static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("write_action_end", 0); grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error), false); GPR_TIMER_END("write_action_end", 0); } @@ -617,13 +622,15 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, case GRPC_CHTTP2_WRITE_STATE_WRITING: GPR_TIMER_MARK("state=writing", 0); t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; + gpr_log(GPR_DEBUG, "W:%s:%p: WRITING -> IDLE", t->is_client ? "CLIENT" : "SERVER", t); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: GPR_TIMER_MARK("state=writing_stale_with_poller", 0); t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING; + gpr_log(GPR_DEBUG, "W:%s:%p: WRITING_MORE -> WRITING", t->is_client ? "CLIENT" : "SERVER", t); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action, - GRPC_ERROR_NONE, false); + grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action_begin_locked, + GRPC_ERROR_NONE); break; } @@ -742,20 +749,22 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, closure->next_data.scratch - CLOSURE_BARRIER_FIRST_REF_BIT); closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; if (error != GRPC_ERROR_NONE) { - if (closure->error == GRPC_ERROR_NONE) { - closure->error = + if (closure->error_data.error == GRPC_ERROR_NONE) { + closure->error_data.error = GRPC_ERROR_CREATE("Error in HTTP transport completing operation"); - closure->error = grpc_error_set_str( - closure->error, GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string); + closure->error_data.error = + grpc_error_set_str(closure->error_data.error, + GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string); } - closure->error = grpc_error_add_child(closure->error, error); + closure->error_data.error = + grpc_error_add_child(closure->error_data.error, error); } if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) { grpc_transport_move_stats(&s->stats, s->collecting_stats); s->collecting_stats = NULL; } - grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, closure->error_data.error, NULL); } *pclosure = NULL; } @@ -842,7 +851,8 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_stream *s = gs; grpc_chttp2_transport *t = s->t; grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error), + s->complete_fetch_covered_by_poller); } static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} @@ -873,7 +883,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, /* use final_data as a barrier until enqueue time; the inital counter is dropped at the end of this function */ on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; - on_complete->error = GRPC_ERROR_NONE; + on_complete->error_data.error = GRPC_ERROR_NONE; if (op->collect_stats != NULL) { GPR_ASSERT(s->collecting_stats == NULL); @@ -959,6 +969,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->fetched_send_message_length = 0; s->fetching_slice_end_offset = (ssize_t)s->flow_controlled_buffer.length + (ssize_t)len; + s->complete_fetch_covered_by_poller = op->covered_by_poller; if (flags & GRPC_WRITE_BUFFER_HINT) { s->fetching_slice_end_offset -= 65536; } @@ -1063,7 +1074,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, op->transport_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, op->covered_by_poller); GPR_TIMER_END("perform_stream_op", 0); } @@ -1173,7 +1184,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, op); GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, false); } /******************************************************************************* @@ -1265,7 +1276,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (s->data_parser.parsing_frame != NULL) { grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error), 0); + exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error)); s->data_parser.parsing_frame = NULL; } @@ -1637,7 +1648,7 @@ static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("reading_action", 0); grpc_chttp2_transport *t = tp; grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error), false); GPR_TIMER_END("reading_action", 0); } @@ -1868,7 +1879,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked, bs); grpc_combiner_execute(exec_ctx, bs->transport->combiner, - &bs->next_action.closure, GRPC_ERROR_NONE); + &bs->next_action.closure, GRPC_ERROR_NONE, false); GPR_TIMER_END("incoming_byte_stream_next", 0); return 0; } @@ -1893,7 +1904,7 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked, bs); grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, false); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } @@ -1916,9 +1927,9 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&bs->slice_mu); } -static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx, - void *bsp, grpc_error *error) { - grpc_chttp2_incoming_byte_stream *bs = bsp; +void grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error) { if (error != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); bs->on_next = NULL; @@ -1928,21 +1939,6 @@ static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx, incoming_byte_stream_unref(exec_ctx, bs); } -void grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error, int from_parsing_thread) { - GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0); - if (from_parsing_thread) { - grpc_closure_init(&bs->finished_action, - incoming_byte_stream_finished_locked, bs); - grpc_combiner_execute(exec_ctx, bs->transport->combiner, - &bs->finished_action, GRPC_ERROR_REF(error)); - } else { - incoming_byte_stream_finished_locked(exec_ctx, bs, error); - } - GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0); -} - grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index e340b2fb068..bcb0ab0f993 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -53,8 +53,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *parser) { if (parser->parsing_frame != NULL) { grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed"), - 1); + exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed")); } GRPC_ERROR_UNREF(parser->error); } @@ -245,7 +244,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, - GRPC_ERROR_NONE, 1); + GRPC_ERROR_NONE); p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_ERROR_NONE; @@ -256,7 +255,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, - GRPC_ERROR_NONE, 1); + GRPC_ERROR_NONE); p->parsing_frame = NULL; cur += p->frame_size; goto fh_0; /* loop */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0c218b79de0..0d15a569514 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -349,6 +349,7 @@ struct grpc_chttp2_stream { uint32_t fetched_send_message_length; gpr_slice fetching_slice; int64_t fetching_slice_end_offset; + bool complete_fetch_covered_by_poller; grpc_closure complete_fetch; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; @@ -643,7 +644,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, gpr_slice slice); void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error, int from_parsing_thread); + grpc_error *error); void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const uint8_t *opaque_8bytes); diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 7a42c2a58a9..9d09e0c7c25 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -163,9 +163,8 @@ void grpc_chttp2_list_add_check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (!t->check_read_ops_scheduled) { GRPC_CHTTP2_REF_TRANSPORT(t, "initiate_read_flush_locked"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->read_action_flush_locked, GRPC_ERROR_NONE, - false); + grpc_combiner_execute_finally( + exec_ctx, t->combiner, &t->read_action_flush_locked, GRPC_ERROR_NONE); t->check_read_ops_scheduled = true; } stream_list_add(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS); diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 1ba0a5c1415..6200cda5dcf 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -51,7 +51,7 @@ void grpc_closure_list_append(grpc_closure_list *closure_list, GRPC_ERROR_UNREF(error); return; } - closure->error = error; + closure->error_data.error = error; closure->next_data.next = NULL; if (closure_list->head == NULL) { closure_list->head = closure; @@ -64,8 +64,8 @@ void grpc_closure_list_append(grpc_closure_list *closure_list, void grpc_closure_list_fail_all(grpc_closure_list *list, grpc_error *forced_failure) { for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) { - if (c->error == GRPC_ERROR_NONE) { - c->error = GRPC_ERROR_REF(forced_failure); + if (c->error_data.error == GRPC_ERROR_NONE) { + c->error_data.error = GRPC_ERROR_REF(forced_failure); } } GRPC_ERROR_UNREF(forced_failure); diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c1a22b60215..bf7c006097d 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -76,7 +76,10 @@ struct grpc_closure { void *cb_arg; /** Once queued, the result of the closure. Before then: scratch space */ - grpc_error *error; + union { + grpc_error *error; + uintptr_t scratch; + } error_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index f1a2b29519e..40be4dea7b4 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -58,7 +58,9 @@ struct grpc_combiner { // lower bit - zero if orphaned // other bits - number of items queued on the lock gpr_atm state; - bool take_async_break_before_final_list; + // number of elements in the list that are covered by a poller: if >0, we can + // offload safely + gpr_atm covered_by_poller; bool time_to_execute_final_list; grpc_closure_list final_list; grpc_closure offload; @@ -66,14 +68,27 @@ struct grpc_combiner { static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +typedef struct { + grpc_error *error; + bool covered_by_poller; +} error_data; + +static uintptr_t pack_error_data(error_data d) { + return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0); +} + +static error_data unpack_error_data(uintptr_t p) { + return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; +} + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; gpr_atm_no_barrier_store(&lock->state, 1); + gpr_atm_no_barrier_store(&lock->covered_by_poller, 0); gpr_mpscq_init(&lock->queue); - lock->take_async_break_before_final_list = false; grpc_closure_list_init(&lock->final_list); grpc_closure_init(&lock->offload, offload, lock); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); @@ -108,13 +123,18 @@ static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error) { + grpc_closure *cl, grpc_error *error, + bool covered_by_poller) { GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); + gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d", lock, cl, covered_by_poller)); GPR_TIMER_BEGIN("combiner.execute", 0); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); GPR_ASSERT(last & 1); // ensure lock has not been destroyed - cl->error = error; + cl->error_data.scratch = + pack_error_data((error_data){error, covered_by_poller}); + if (covered_by_poller) { + gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1); + } gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); if (last == 1) { // code will be written when the exec_ctx calls @@ -152,11 +172,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } if (lock->optional_workqueue != NULL && - grpc_exec_ctx_ready_to_finish(exec_ctx)) { + grpc_exec_ctx_ready_to_finish(exec_ctx) && + gpr_atm_acq_load(&lock->covered_by_poller) > 0) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); - // this execution context wants to move on, and we have a workqueue (and so - // can help the execution context out): schedule remaining work to be picked - // up on the workqueue + // this execution context wants to move on, and we have a workqueue (and + // so can help the execution context out): schedule remaining work to be + // picked up on the workqueue queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; @@ -173,7 +194,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistant state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - if (lock->optional_workqueue != NULL) { + if (lock->optional_workqueue != NULL && gpr_atm_acq_load(&lock->covered_by_poller) > 0) { queue_offload(exec_ctx, lock); } GPR_TIMER_END("combiner.continue_exec_ctx", 0); @@ -181,37 +202,28 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } GPR_TIMER_BEGIN("combiner.exec1", 0); grpc_closure *cl = (grpc_closure *)n; - grpc_error *error = cl->error; - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); + error_data err = unpack_error_data(cl->error_data.scratch); + cl->cb(exec_ctx, cl->cb_arg, err.error); + if (err.covered_by_poller) { + gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1); + } + GRPC_ERROR_UNREF(err.error); GPR_TIMER_END("combiner.exec1", 0); } else { - if (lock->take_async_break_before_final_list) { - GPR_TIMER_MARK("async_break", 0); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p take async break", lock)); - lock->take_async_break_before_final_list = false; - if (lock->optional_workqueue != NULL) { - queue_offload(exec_ctx, lock); - } - GPR_TIMER_END("combiner.continue_exec_ctx", 0); - return true; - } else { - grpc_closure *c = lock->final_list.head; - GPR_ASSERT(c != NULL); - grpc_closure_list_init(&lock->final_list); - lock->take_async_break_before_final_list = false; - int loops = 0; - while (c != NULL) { - GPR_TIMER_BEGIN("combiner.exec_1final", 0); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - GPR_TIMER_END("combiner.exec_1final", 0); - } + grpc_closure *c = lock->final_list.head; + GPR_ASSERT(c != NULL); + grpc_closure_list_init(&lock->final_list); + int loops = 0; + while (c != NULL) { + GPR_TIMER_BEGIN("combiner.exec_1final", 0); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + GPR_TIMER_END("combiner.exec_1final", 0); } } @@ -250,35 +262,27 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); + GRPC_ERROR_REF(error)); } void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool force_async_break) { - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, - "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p", - lock, closure, force_async_break, exec_ctx->active_combiner)); + grpc_closure *closure, grpc_error *error) { + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute_finally c=%p; ac=%p", + lock, closure, exec_ctx->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); grpc_combiner_execute(exec_ctx, lock, - grpc_closure_create(enqueue_finally, closure), error); + grpc_closure_create(enqueue_finally, closure), error, + false); GPR_TIMER_END("combiner.execute_finally", 0); return; } - if (force_async_break) { - lock->take_async_break_before_final_list = true; - } if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, 2); } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } - -void grpc_combiner_force_async_finally(grpc_combiner *lock) { - lock->take_async_break_before_final_list = true; -} diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 28f548b2f50..80ed33c2a7f 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -52,7 +52,8 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); // Execute \a action within the lock. void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error); + grpc_closure *closure, grpc_error *error, + bool covered_by_poller); // Execute \a action within the lock just prior to unlocking. // if \a hint_async_break is additionally set, the combiner is tries to trip // through the workqueue between finishing the primary queue of combined @@ -60,9 +61,7 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, // Takes a very slow and round-about path if not called from a // grpc_combiner_execute closure void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool hint_async_break); -void grpc_combiner_force_async_finally(grpc_combiner *lock); + grpc_closure *closure, grpc_error *error); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 747b462a6ed..eec32a4f268 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -67,7 +67,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; + grpc_error *error = c->error_data.error; did_something = true; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); c->cb(exec_ctx, c->cb_arg, error); @@ -87,7 +87,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue, exec_ctx->stolen_closure, - exec_ctx->stolen_closure->error); + exec_ctx->stolen_closure->error_data.error); GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue, "exec_ctx_sched"); exec_ctx->stealing_from_workqueue = NULL; @@ -98,7 +98,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { "exec_ctx_sched"); exec_ctx->stealing_from_workqueue = NULL; exec_ctx->stolen_closure = NULL; - grpc_error *error = c->error; + grpc_error *error = c->error_data.error; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0); c->cb(exec_ctx, c->cb_arg, error); GRPC_ERROR_UNREF(error); @@ -125,7 +125,7 @@ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } else if (exec_ctx->stealing_from_workqueue == NULL) { exec_ctx->stealing_from_workqueue = offload_target_or_null; - closure->error = error; + closure->error_data.error = error; exec_ctx->stolen_closure = closure; } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) { grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); @@ -133,8 +133,8 @@ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, } else { /* stealing_from_workqueue == offload_target_or_null */ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, exec_ctx->stolen_closure, - exec_ctx->stolen_closure->error); - closure->error = error; + exec_ctx->stolen_closure->error_data.error); + closure->error_data.error = error; exec_ctx->stolen_closure = closure; GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); } diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index ecfea68f564..6c27c3b41ed 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -171,7 +171,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { wakeup(exec_ctx, workqueue); } grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error; + grpc_error *clerr = cl->error_data.error; cl->cb(exec_ctx, cl->cb_arg, clerr); GRPC_ERROR_UNREF(clerr); } @@ -185,7 +185,7 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, GPR_TIMER_BEGIN("workqueue.enqueue", 0); gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); GPR_ASSERT(last & 1); - closure->error = error; + closure->error_data.error = error; gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next); if (last == 1) { wakeup(exec_ctx, workqueue); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 8ed33bee5f0..5f120a69c36 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1405,6 +1405,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); + stream_op->covered_by_poller = true; if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index fe47fea3062..42f51c9ce4d 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -113,6 +113,10 @@ typedef struct grpc_transport_stream_op { have been completed. */ grpc_closure *on_complete; + /** Is the completion of this op covered by a poller (if false: the op should + complete independently of some pollset being polled) */ + bool covered_by_poller; + /** Send initial metadata to the peer, from the provided metadata batch. idempotent_request MUST be set if this is non-null */ grpc_metadata_batch *send_initial_metadata; diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 7cf016d82cc..cfb6159b17f 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -61,7 +61,7 @@ static void test_execute_one(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(set_bool_to_true, &done), - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, false); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(done); grpc_combiner_destroy(&exec_ctx, lock); @@ -96,7 +96,8 @@ static void execute_many_loop(void *a) { c->ctr = &args->ctr; c->value = n++; grpc_combiner_execute(&exec_ctx, args->lock, - grpc_closure_create(check_one, c), GRPC_ERROR_NONE); + grpc_closure_create(check_one, c), GRPC_ERROR_NONE, + false); grpc_exec_ctx_flush(&exec_ctx); } gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100)); @@ -132,9 +133,8 @@ static void in_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static void add_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_combiner_execute_finally(exec_ctx, arg, - grpc_closure_create(in_finally, NULL), - GRPC_ERROR_NONE, false); + grpc_combiner_execute_finally( + exec_ctx, arg, grpc_closure_create(in_finally, NULL), GRPC_ERROR_NONE); } static void test_execute_finally(void) { @@ -143,7 +143,7 @@ static void test_execute_finally(void) { grpc_combiner *lock = grpc_combiner_create(NULL); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock), - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, false); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(got_in_finally); grpc_combiner_destroy(&exec_ctx, lock);