stream_op cleanup: iomgr closure, executor changes

reviewable/pr3993/r1
Craig Tiller 9 years ago
parent 577c9b2f11
commit 48db18f910
  1. 23
      src/core/iomgr/closure.c
  2. 19
      src/core/iomgr/closure.h
  3. 13
      src/core/iomgr/executor.c

@ -39,18 +39,17 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) { void *cb_arg) {
closure->cb = cb; closure->cb = cb;
closure->cb_arg = cb_arg; closure->cb_arg = cb_arg;
closure->next = NULL; closure->final_data = 0;
} }
void grpc_closure_list_add(grpc_closure_list *closure_list, void grpc_closure_list_add(grpc_closure_list *closure_list,
grpc_closure *closure, int success) { grpc_closure *closure, int success) {
if (closure == NULL) return; if (closure == NULL) return;
closure->next = NULL; closure->final_data = (success != 0);
closure->success = success;
if (closure_list->head == NULL) { if (closure_list->head == NULL) {
closure_list->head = closure; closure_list->head = closure;
} else { } else {
closure_list->tail->next = closure; closure_list->tail->final_data |= (gpr_uintptr)closure;
} }
closure_list->tail = closure; closure_list->tail = closure;
} }
@ -66,22 +65,12 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
if (dst->head == NULL) { if (dst->head == NULL) {
*dst = *src; *dst = *src;
} else { } else {
dst->tail->next = src->head; dst->tail->final_data |= (gpr_uintptr)src->head;
dst->tail = src->tail; dst->tail = src->tail;
} }
src->head = src->tail = NULL; src->head = src->tail = NULL;
} }
grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) {
grpc_closure *head;
if (list->head == NULL) {
return NULL;
}
head = list->head;
list->head = list->head->next;
return head;
}
typedef struct { typedef struct {
grpc_iomgr_cb_func cb; grpc_iomgr_cb_func cb;
void *cb_arg; void *cb_arg;
@ -103,3 +92,7 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_closure_init(&wc->wrapper, closure_wrapper, wc); grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
return &wc->wrapper; return &wc->wrapper;
} }
grpc_closure *grpc_closure_next(grpc_closure *closure) {
return (grpc_closure *)(closure->final_data & ~(gpr_uintptr)1);
}

@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H #ifndef GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H
#define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H #define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H
#include <stddef.h> #include <grpc/support/port_platform.h>
struct grpc_closure; struct grpc_closure;
typedef struct grpc_closure grpc_closure; typedef struct grpc_closure grpc_closure;
@ -64,13 +64,10 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */ /** Arguments to be passed to "cb". */
void *cb_arg; void *cb_arg;
/** Internal. A boolean indication to "cb" on the state of the iomgr. /** Once enqueued, contains in the lower bit the success of the closure,
* For instance, closures created during a shutdown would have this field set and in the upper bits the pointer to the next closure in the list.
* to false. */ Before enqueing for execution, this is usable for scratch data. */
int success; gpr_uintptr final_data;
/**< Internal. Do not touch */
struct grpc_closure *next;
}; };
/** Initializes \a closure with \a cb and \a cb_arg. */ /** Initializes \a closure with \a cb and \a cb_arg. */
@ -91,10 +88,10 @@ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
/** append all closures from \a src to \a dst and empty \a src. */ /** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** pop (return and remove) the head closure from \a list. */
grpc_closure *grpc_closure_list_pop(grpc_closure_list *list);
/** return whether \a list is empty. */ /** return whether \a list is empty. */
int grpc_closure_list_empty(grpc_closure_list list); int grpc_closure_list_empty(grpc_closure_list list);
/** return the next pointer for a queued closure list */
grpc_closure *grpc_closure_next(grpc_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */

@ -63,8 +63,6 @@ void grpc_executor_init() {
/* thread body */ /* thread body */
static void closure_exec_thread_func(void *ignored) { static void closure_exec_thread_func(void *ignored) {
grpc_closure *closure;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (1) { while (1) {
gpr_mu_lock(&g_executor.mu); gpr_mu_lock(&g_executor.mu);
@ -72,16 +70,16 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu); gpr_mu_unlock(&g_executor.mu);
break; break;
} }
closure = grpc_closure_list_pop(&g_executor.closures); if (grpc_closure_list_empty(g_executor.closures)) {
if (closure == NULL) {
/* no more work, time to die */ /* no more work, time to die */
GPR_ASSERT(g_executor.busy == 1); GPR_ASSERT(g_executor.busy == 1);
g_executor.busy = 0; g_executor.busy = 0;
gpr_mu_unlock(&g_executor.mu); gpr_mu_unlock(&g_executor.mu);
break; break;
} else {
grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
} }
gpr_mu_unlock(&g_executor.mu); gpr_mu_unlock(&g_executor.mu);
closure->cb(&exec_ctx, closure->cb_arg, closure->success);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
} }
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
@ -125,7 +123,6 @@ void grpc_executor_enqueue(grpc_closure *closure, int success) {
void grpc_executor_shutdown() { void grpc_executor_shutdown() {
int pending_join; int pending_join;
grpc_closure *closure;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&g_executor.mu); gpr_mu_lock(&g_executor.mu);
@ -136,9 +133,7 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */ * list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */ /* Execute pending callbacks, some may be performing cleanups */
while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) { grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
closure->cb(&exec_ctx, closure->cb_arg, closure->success);
}
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) { if (pending_join) {

Loading…
Cancel
Save