Note polling coverage when taking combiner locks: resolves offload issues

reviewable/pr8008/r2
Craig Tiller 9 years ago
parent d58daa2b2f
commit a7cd41cc46
  1. 72
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 7
      src/core/ext/transport/chttp2/transport/frame_data.c
  3. 3
      src/core/ext/transport/chttp2/transport/internal.h
  4. 5
      src/core/ext/transport/chttp2/transport/stream_lists.c
  5. 6
      src/core/lib/iomgr/closure.c
  6. 5
      src/core/lib/iomgr/closure.h
  7. 112
      src/core/lib/iomgr/combiner.c
  8. 7
      src/core/lib/iomgr/combiner.h
  9. 12
      src/core/lib/iomgr/exec_ctx.c
  10. 4
      src/core/lib/iomgr/workqueue_posix.c
  11. 1
      src/core/lib/surface/call.c
  12. 4
      src/core/lib/transport/transport.h
  13. 12
      test/core/iomgr/combiner_test.c

@ -372,7 +372,7 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_combiner_execute(exec_ctx, t->combiner,
grpc_closure_create(destroy_transport_locked, t),
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, false);
}
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->destroy_stream_arg = and_free_memory;
grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream,
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, false);
GPR_TIMER_END("destroy_stream", 0);
}
@ -546,13 +546,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
gpr_log(GPR_DEBUG, "W:%s:%p: IDLE -> WRITING", t->is_client ? "CLIENT" : "SERVER", t);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
GRPC_ERROR_NONE, false);
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME;
gpr_log(GPR_DEBUG, "W:%s:%p: WRITING -> WRITING_MORE", t->is_client ? "CLIENT" : "SERVER", t);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
break;
@ -576,8 +578,11 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
gpr_log(GPR_DEBUG, "W:%s:%p: WRITING|WRITING_MORE -> WRITING", t->is_client ? "CLIENT" : "SERVER", t);
grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
} else {
gpr_log(GPR_DEBUG, "W:%s:%p: WRITING|WRITING_MORE -> IDLE", t->is_client ? "CLIENT" : "SERVER", t);
t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
@ -596,7 +601,7 @@ static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action_end", 0);
grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked,
GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error), false);
GPR_TIMER_END("write_action_end", 0);
}
@ -617,13 +622,15 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITE_STATE_WRITING:
GPR_TIMER_MARK("state=writing", 0);
t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
gpr_log(GPR_DEBUG, "W:%s:%p: WRITING -> IDLE", t->is_client ? "CLIENT" : "SERVER", t);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
gpr_log(GPR_DEBUG, "W:%s:%p: WRITING_MORE -> WRITING", t->is_client ? "CLIENT" : "SERVER", t);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action,
GRPC_ERROR_NONE, false);
grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action_begin_locked,
GRPC_ERROR_NONE);
break;
}
@ -742,20 +749,22 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure->next_data.scratch - CLOSURE_BARRIER_FIRST_REF_BIT);
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
if (error != GRPC_ERROR_NONE) {
if (closure->error == GRPC_ERROR_NONE) {
closure->error =
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error =
GRPC_ERROR_CREATE("Error in HTTP transport completing operation");
closure->error = grpc_error_set_str(
closure->error, GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string);
closure->error_data.error =
grpc_error_set_str(closure->error_data.error,
GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string);
}
closure->error = grpc_error_add_child(closure->error, error);
closure->error_data.error =
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = NULL;
}
grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL);
grpc_exec_ctx_sched(exec_ctx, closure, closure->error_data.error, NULL);
}
*pclosure = NULL;
}
@ -842,7 +851,8 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t;
grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked,
GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error),
s->complete_fetch_covered_by_poller);
}
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
@ -873,7 +883,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
/* use final_data as a barrier until enqueue time; the inital counter is
dropped at the end of this function */
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error = GRPC_ERROR_NONE;
on_complete->error_data.error = GRPC_ERROR_NONE;
if (op->collect_stats != NULL) {
GPR_ASSERT(s->collecting_stats == NULL);
@ -959,6 +969,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->fetched_send_message_length = 0;
s->fetching_slice_end_offset =
(ssize_t)s->flow_controlled_buffer.length + (ssize_t)len;
s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
s->fetching_slice_end_offset -= 65536;
}
@ -1063,7 +1074,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op->transport_private.args[1] = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, op->covered_by_poller);
GPR_TIMER_END("perform_stream_op", 0);
}
@ -1173,7 +1184,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op);
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, false);
}
/*******************************************************************************
@ -1265,7 +1276,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
if (s->data_parser.parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error), 0);
exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
s->data_parser.parsing_frame = NULL;
}
@ -1637,7 +1648,7 @@ static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action", 0);
grpc_chttp2_transport *t = tp;
grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked,
GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error), false);
GPR_TIMER_END("reading_action", 0);
}
@ -1868,7 +1879,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
bs);
grpc_combiner_execute(exec_ctx, bs->transport->combiner,
&bs->next_action.closure, GRPC_ERROR_NONE);
&bs->next_action.closure, GRPC_ERROR_NONE, false);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
@ -1893,7 +1904,7 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
bs);
grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action,
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, false);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
@ -1916,9 +1927,9 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&bs->slice_mu);
}
static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
void *bsp, grpc_error *error) {
grpc_chttp2_incoming_byte_stream *bs = bsp;
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
if (error != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
bs->on_next = NULL;
@ -1928,21 +1939,6 @@ static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
incoming_byte_stream_unref(exec_ctx, bs);
}
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0);
if (from_parsing_thread) {
grpc_closure_init(&bs->finished_action,
incoming_byte_stream_finished_locked, bs);
grpc_combiner_execute(exec_ctx, bs->transport->combiner,
&bs->finished_action, GRPC_ERROR_REF(error));
} else {
incoming_byte_stream_finished_locked(exec_ctx, bs, error);
}
GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags) {

@ -53,8 +53,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
if (parser->parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed"),
1);
exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed"));
}
GRPC_ERROR_UNREF(parser->error);
}
@ -245,7 +244,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_ERROR_NONE;
@ -256,7 +255,7 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE, 1);
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */

@ -349,6 +349,7 @@ struct grpc_chttp2_stream {
uint32_t fetched_send_message_length;
gpr_slice fetching_slice;
int64_t fetching_slice_end_offset;
bool complete_fetch_covered_by_poller;
grpc_closure complete_fetch;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@ -643,7 +644,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
gpr_slice slice);
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread);
grpc_error *error);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const uint8_t *opaque_8bytes);

@ -163,9 +163,8 @@ void grpc_chttp2_list_add_check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (!t->check_read_ops_scheduled) {
GRPC_CHTTP2_REF_TRANSPORT(t, "initiate_read_flush_locked");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->read_action_flush_locked, GRPC_ERROR_NONE,
false);
grpc_combiner_execute_finally(
exec_ctx, t->combiner, &t->read_action_flush_locked, GRPC_ERROR_NONE);
t->check_read_ops_scheduled = true;
}
stream_list_add(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS);

@ -51,7 +51,7 @@ void grpc_closure_list_append(grpc_closure_list *closure_list,
GRPC_ERROR_UNREF(error);
return;
}
closure->error = error;
closure->error_data.error = error;
closure->next_data.next = NULL;
if (closure_list->head == NULL) {
closure_list->head = closure;
@ -64,8 +64,8 @@ void grpc_closure_list_append(grpc_closure_list *closure_list,
void grpc_closure_list_fail_all(grpc_closure_list *list,
grpc_error *forced_failure) {
for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) {
if (c->error == GRPC_ERROR_NONE) {
c->error = GRPC_ERROR_REF(forced_failure);
if (c->error_data.error == GRPC_ERROR_NONE) {
c->error_data.error = GRPC_ERROR_REF(forced_failure);
}
}
GRPC_ERROR_UNREF(forced_failure);

@ -76,7 +76,10 @@ struct grpc_closure {
void *cb_arg;
/** Once queued, the result of the closure. Before then: scratch space */
grpc_error *error;
union {
grpc_error *error;
uintptr_t scratch;
} error_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */

@ -58,7 +58,9 @@ struct grpc_combiner {
// lower bit - zero if orphaned
// other bits - number of items queued on the lock
gpr_atm state;
bool take_async_break_before_final_list;
// number of elements in the list that are covered by a poller: if >0, we can
// offload safely
gpr_atm covered_by_poller;
bool time_to_execute_final_list;
grpc_closure_list final_list;
grpc_closure offload;
@ -66,14 +68,27 @@ struct grpc_combiner {
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
typedef struct {
grpc_error *error;
bool covered_by_poller;
} error_data;
static uintptr_t pack_error_data(error_data d) {
return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0);
}
static error_data unpack_error_data(uintptr_t p) {
return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
}
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->state, 1);
gpr_atm_no_barrier_store(&lock->covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
lock->take_async_break_before_final_list = false;
grpc_closure_list_init(&lock->final_list);
grpc_closure_init(&lock->offload, offload, lock);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
@ -108,13 +123,18 @@ static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error) {
grpc_closure *cl, grpc_error *error,
bool covered_by_poller) {
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d", lock, cl, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
cl->error = error;
cl->error_data.scratch =
pack_error_data((error_data){error, covered_by_poller});
if (covered_by_poller) {
gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1);
}
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
if (last == 1) {
// code will be written when the exec_ctx calls
@ -152,11 +172,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
if (lock->optional_workqueue != NULL &&
grpc_exec_ctx_ready_to_finish(exec_ctx)) {
grpc_exec_ctx_ready_to_finish(exec_ctx) &&
gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on, and we have a workqueue (and so
// can help the execution context out): schedule remaining work to be picked
// up on the workqueue
// this execution context wants to move on, and we have a workqueue (and
// so can help the execution context out): schedule remaining work to be
// picked up on the workqueue
queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
@ -173,7 +194,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
if (lock->optional_workqueue != NULL) {
if (lock->optional_workqueue != NULL && gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
queue_offload(exec_ctx, lock);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
@ -181,37 +202,28 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
GPR_TIMER_BEGIN("combiner.exec1", 0);
grpc_closure *cl = (grpc_closure *)n;
grpc_error *error = cl->error;
cl->cb(exec_ctx, cl->cb_arg, error);
GRPC_ERROR_UNREF(error);
error_data err = unpack_error_data(cl->error_data.scratch);
cl->cb(exec_ctx, cl->cb_arg, err.error);
if (err.covered_by_poller) {
gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1);
}
GRPC_ERROR_UNREF(err.error);
GPR_TIMER_END("combiner.exec1", 0);
} else {
if (lock->take_async_break_before_final_list) {
GPR_TIMER_MARK("async_break", 0);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p take async break", lock));
lock->take_async_break_before_final_list = false;
if (lock->optional_workqueue != NULL) {
queue_offload(exec_ctx, lock);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
} else {
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
lock->take_async_break_before_final_list = false;
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
GPR_TIMER_END("combiner.exec_1final", 0);
}
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
GPR_TIMER_END("combiner.exec_1final", 0);
}
}
@ -250,35 +262,27 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
GRPC_ERROR_REF(error), false);
GRPC_ERROR_REF(error));
}
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool force_async_break) {
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG,
"C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p",
lock, closure, force_async_break, exec_ctx->active_combiner));
grpc_closure *closure, grpc_error *error) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_execute_finally c=%p; ac=%p",
lock, closure, exec_ctx->active_combiner));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
grpc_combiner_execute(exec_ctx, lock,
grpc_closure_create(enqueue_finally, closure), error);
grpc_closure_create(enqueue_finally, closure), error,
false);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
if (force_async_break) {
lock->take_async_break_before_final_list = true;
}
if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, 2);
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
void grpc_combiner_force_async_finally(grpc_combiner *lock) {
lock->take_async_break_before_final_list = true;
}

@ -52,7 +52,8 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Execute \a action within the lock.
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error);
grpc_closure *closure, grpc_error *error,
bool covered_by_poller);
// Execute \a action within the lock just prior to unlocking.
// if \a hint_async_break is additionally set, the combiner is tries to trip
// through the workqueue between finishing the primary queue of combined
@ -60,9 +61,7 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
// Takes a very slow and round-about path if not called from a
// grpc_combiner_execute closure
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool hint_async_break);
void grpc_combiner_force_async_finally(grpc_combiner *lock);
grpc_closure *closure, grpc_error *error);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);

@ -67,7 +67,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error;
grpc_error *error = c->error_data.error;
did_something = true;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
c->cb(exec_ctx, c->cb_arg, error);
@ -87,7 +87,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
exec_ctx->stolen_closure,
exec_ctx->stolen_closure->error);
exec_ctx->stolen_closure->error_data.error);
GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
"exec_ctx_sched");
exec_ctx->stealing_from_workqueue = NULL;
@ -98,7 +98,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
"exec_ctx_sched");
exec_ctx->stealing_from_workqueue = NULL;
exec_ctx->stolen_closure = NULL;
grpc_error *error = c->error;
grpc_error *error = c->error_data.error;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
@ -125,7 +125,7 @@ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
} else if (exec_ctx->stealing_from_workqueue == NULL) {
exec_ctx->stealing_from_workqueue = offload_target_or_null;
closure->error = error;
closure->error_data.error = error;
exec_ctx->stolen_closure = closure;
} else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
@ -133,8 +133,8 @@ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
} else { /* stealing_from_workqueue == offload_target_or_null */
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
exec_ctx->stolen_closure,
exec_ctx->stolen_closure->error);
closure->error = error;
exec_ctx->stolen_closure->error_data.error);
closure->error_data.error = error;
exec_ctx->stolen_closure = closure;
GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
}

@ -171,7 +171,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wakeup(exec_ctx, workqueue);
}
grpc_closure *cl = (grpc_closure *)n;
grpc_error *clerr = cl->error;
grpc_error *clerr = cl->error_data.error;
cl->cb(exec_ctx, cl->cb_arg, clerr);
GRPC_ERROR_UNREF(clerr);
}
@ -185,7 +185,7 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
GPR_ASSERT(last & 1);
closure->error = error;
closure->error_data.error = error;
gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
if (last == 1) {
wakeup(exec_ctx, workqueue);

@ -1405,6 +1405,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *stream_op = &bctl->op;
memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");

@ -113,6 +113,10 @@ typedef struct grpc_transport_stream_op {
have been completed. */
grpc_closure *on_complete;
/** Is the completion of this op covered by a poller (if false: the op should
complete independently of some pollset being polled) */
bool covered_by_poller;
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;

@ -61,7 +61,7 @@ static void test_execute_one(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_execute(&exec_ctx, lock,
grpc_closure_create(set_bool_to_true, &done),
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, false);
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(done);
grpc_combiner_destroy(&exec_ctx, lock);
@ -96,7 +96,8 @@ static void execute_many_loop(void *a) {
c->ctr = &args->ctr;
c->value = n++;
grpc_combiner_execute(&exec_ctx, args->lock,
grpc_closure_create(check_one, c), GRPC_ERROR_NONE);
grpc_closure_create(check_one, c), GRPC_ERROR_NONE,
false);
grpc_exec_ctx_flush(&exec_ctx);
}
gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
@ -132,9 +133,8 @@ static void in_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
}
static void add_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, arg,
grpc_closure_create(in_finally, NULL),
GRPC_ERROR_NONE, false);
grpc_combiner_execute_finally(
exec_ctx, arg, grpc_closure_create(in_finally, NULL), GRPC_ERROR_NONE);
}
static void test_execute_finally(void) {
@ -143,7 +143,7 @@ static void test_execute_finally(void) {
grpc_combiner *lock = grpc_combiner_create(NULL);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock),
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, false);
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(got_in_finally);
grpc_combiner_destroy(&exec_ctx, lock);

Loading…
Cancel
Save