reviewable/pr8008/r2
Craig Tiller 9 years ago
parent 096d6a6b05
commit 86037cd0e7
  1. 22
      src/core/lib/iomgr/combiner.c
  2. 8
      src/core/lib/iomgr/error.h
  3. 2
      src/core/lib/iomgr/tcp_posix.c
  4. 62
      src/core/lib/iomgr/workqueue_posix.c

@ -112,7 +112,8 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock) {
lock->next_combiner_on_this_exec_ctx = NULL;
if (exec_ctx->active_combiner == NULL) {
exec_ctx->active_combiner = exec_ctx->last_combiner = lock;
@ -122,6 +123,15 @@ static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock) {
lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner;
exec_ctx->active_combiner = lock;
if (lock->next_combiner_on_this_exec_ctx == NULL) {
exec_ctx->last_combiner = lock;
}
}
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error,
bool covered_by_poller) {
@ -140,7 +150,7 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
if (last == 1) {
// code will be written when the exec_ctx calls
// grpc_combiner_continue_exec_ctx
queue_on_exec_ctx(exec_ctx, lock);
push_last_on_exec_ctx(exec_ctx, lock);
}
GPR_TIMER_END("combiner.execute", 0);
}
@ -155,7 +165,7 @@ static void move_next(grpc_exec_ctx *exec_ctx) {
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_combiner *lock = arg;
queue_on_exec_ctx(exec_ctx, lock);
push_last_on_exec_ctx(exec_ctx, lock);
}
static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
@ -230,10 +240,11 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
GPR_TIMER_MARK("unref", 0);
move_next(exec_ctx);
lock->time_to_execute_final_list = false;
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2);
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state));
lock->time_to_execute_final_list = false;
switch (old_state) {
default:
// we have multiple queued work items: just continue executing them
@ -245,11 +256,9 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
break;
case 3: // had one count, one unorphaned --> unlocked unorphaned
move_next(exec_ctx);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
case 2: // and one count, one orphaned --> unlocked and orphaned
move_next(exec_ctx);
really_destroy(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
@ -260,6 +269,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
GPR_UNREACHABLE_CODE(return true);
}
push_first_on_exec_ctx(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
}

@ -123,9 +123,13 @@ typedef enum {
GRPC_ERROR_TIME_CREATED,
} grpc_error_times;
/// The following "special" errors can be propagated without allocating memory.
/// They are always even so that other code (particularly combiner locks) can
/// safely use the lower bit for themselves.
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
#define GRPC_ERROR_OOM ((grpc_error *)1)
#define GRPC_ERROR_CANCELLED ((grpc_error *)2)
#define GRPC_ERROR_OOM ((grpc_error *)2)
#define GRPC_ERROR_CANCELLED ((grpc_error *)4)
const char *grpc_error_string(grpc_error *error);
void grpc_error_free_string(const char *str);

@ -279,7 +279,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, grpc_fd_get_workqueue(tcp->em_fd));
grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
}
}

@ -76,7 +76,8 @@ static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) {
gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -1);
if (last == 1) {
workqueue_destroy(exec_ctx, workqueue);
}
}
@ -143,37 +144,40 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_free(workqueue);
} else {
error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
if (error == GRPC_ERROR_NONE) {
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
} else {
if (error != GRPC_ERROR_NONE) {
/* recurse to get error handling */
on_readable(exec_ctx, arg, error);
}
if (n == NULL) {
/* try again - queue in an inconsistant state */
wakeup(exec_ctx, workqueue);
} else {
switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) {
case 3: // had one count, one unorphaned --> done, unorphaned
break;
case 2: // had one count, one orphaned --> done, orphaned
workqueue_destroy(exec_ctx, workqueue);
break;
case 1:
case 0:
// these values are illegal - representing an already done or
// deleted workqueue
GPR_UNREACHABLE_CODE(break);
default:
// schedule a wakeup since there's more to do
wakeup(exec_ctx, workqueue);
gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
if (n == NULL) {
/* try again - queue in an ephemerally inconsistent state */
wakeup(exec_ctx, workqueue);
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
} else {
gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2);
switch (last) {
default:
// schedule a wakeup since there's more to do
wakeup(exec_ctx, workqueue);
break;
case 3: // had one count, one unorphaned --> done, unorphaned
break;
case 2: // had one count, one orphaned --> done, orphaned
workqueue_destroy(exec_ctx, workqueue);
break;
case 1:
case 0:
// these values are illegal - representing an already done or
// deleted workqueue
GPR_UNREACHABLE_CODE(break);
}
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
grpc_closure *cl = (grpc_closure *)n;
grpc_error *clerr = cl->error_data.error;
grpc_closure_run(exec_ctx, cl, clerr);
}
grpc_closure *cl = (grpc_closure *)n;
grpc_error *clerr = cl->error_data.error;
cl->cb(exec_ctx, cl->cb_arg, clerr);
GRPC_ERROR_UNREF(clerr);
}
}
@ -183,6 +187,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
GRPC_WORKQUEUE_REF(workqueue, "enqueue");
gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
GPR_ASSERT(last & 1);
closure->error_data.error = error;
@ -190,6 +195,7 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
if (last == 1) {
wakeup(exec_ctx, workqueue);
}
GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
GPR_TIMER_END("workqueue.enqueue", 0);
}

Loading…
Cancel
Save