reviewable/pr11758/r1
Craig Tiller 7 years ago
parent 764cdbaaf0
commit af723b0424
  1. 50
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 32
      src/core/lib/iomgr/executor.c
  3. 3
      src/core/lib/surface/call.c
  4. 8
      test/core/end2end/tests/resource_quota_server.c

@ -886,6 +886,21 @@ static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t,
GPR_UNREACHABLE_CODE(return NULL); GPR_UNREACHABLE_CODE(return NULL);
} }
#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
static const char *begin_writing_desc(bool partial, bool inlined) {
switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
case WRITE_STATE_TUPLE_TO_INT(false, false):
return "begin write in background";
case WRITE_STATE_TUPLE_TO_INT(false, true):
return "begin write in current thread";
case WRITE_STATE_TUPLE_TO_INT(true, false):
return "begin partial write in background";
case WRITE_STATE_TUPLE_TO_INT(true, true):
return "begin partial write in current thread";
}
GPR_UNREACHABLE_CODE(return "bad state tuple");
}
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error_ignored) { grpc_error *error_ignored) {
GPR_TIMER_BEGIN("write_action_begin_locked", 0); GPR_TIMER_BEGIN("write_action_begin_locked", 0);
@ -898,15 +913,17 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
r = grpc_chttp2_begin_write(exec_ctx, t); r = grpc_chttp2_begin_write(exec_ctx, t);
} }
if (r.writing) { if (r.writing) {
set_write_state(exec_ctx, t, grpc_closure_scheduler *scheduler =
r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE write_scheduler(t, r.early_results_scheduled);
: GRPC_CHTTP2_WRITE_STATE_WRITING, set_write_state(
r.partial ? "begin writing partial" : "begin writing"); exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
GRPC_CLOSURE_SCHED( : GRPC_CHTTP2_WRITE_STATE_WRITING,
exec_ctx, begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
GRPC_CLOSURE_INIT(&t->write_action, write_action, t, GPR_ASSERT(scheduler == grpc_schedule_on_exec_ctx ||
write_scheduler(t, r.early_results_scheduled)), scheduler == grpc_executor_scheduler);
GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action,
write_action, t, scheduler),
GRPC_ERROR_NONE);
} else { } else {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
"begin writing nothing"); "begin writing nothing");
@ -918,6 +935,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_TIMER_BEGIN("write_action", 0); GPR_TIMER_BEGIN("write_action", 0);
gpr_log(GPR_DEBUG, "W:%p write_action", t);
grpc_endpoint_write( grpc_endpoint_write(
exec_ctx, t->ep, &t->outbuf, exec_ctx, t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
@ -1104,12 +1122,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
if (GRPC_TRACER_ON(grpc_http_trace)) { if (GRPC_TRACER_ON(grpc_http_trace)) {
const char *errstr = grpc_error_string(error); const char *errstr = grpc_error_string(error);
gpr_log(GPR_DEBUG, gpr_log(
"complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", GPR_DEBUG,
closure, "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
(int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), "write_state=%s",
(int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), t, closure,
desc, errstr); (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
(int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), desc,
errstr, write_state_name(t->write_state));
} }
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) { if (closure->error_data.error == GRPC_ERROR_NONE) {

@ -49,6 +49,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
GPR_TLS_DECL(g_this_thread_state); GPR_TLS_DECL(g_this_thread_state);
static grpc_tracer_flag executor_trace =
GRPC_TRACER_INITIALIZER(false, "executor");
static void executor_thread(void *arg); static void executor_thread(void *arg);
static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
@ -58,6 +61,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
while (c != NULL) { while (c != NULL) {
grpc_closure *next = c->next_data.next; grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error; grpc_error *error = c->error_data.error;
if (GRPC_TRACER_ON(executor_trace)) {
#ifndef NDEBUG
gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
c->file_created, c->line_created);
#else
gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
#endif
}
#ifndef NDEBUG #ifndef NDEBUG
c->scheduled = false; c->scheduled = false;
#endif #endif
@ -119,6 +130,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
} }
void grpc_executor_init(grpc_exec_ctx *exec_ctx) { void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
grpc_register_tracer(&executor_trace);
gpr_atm_no_barrier_store(&g_cur_threads, 0); gpr_atm_no_barrier_store(&g_cur_threads, 0);
grpc_executor_set_threading(exec_ctx, true); grpc_executor_set_threading(exec_ctx, true);
} }
@ -136,18 +148,31 @@ static void executor_thread(void *arg) {
size_t subtract_depth = 0; size_t subtract_depth = 0;
for (;;) { for (;;) {
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG,
"EXECUTOR[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
ts - g_thread_state, subtract_depth);
}
gpr_mu_lock(&ts->mu); gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth; ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
} }
if (ts->shutdown) { if (ts->shutdown) {
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: shutdown",
ts - g_thread_state);
}
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
break; break;
} }
grpc_closure_list exec = ts->elems; grpc_closure_list exec = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu); gpr_mu_unlock(&ts->mu);
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: execute",
ts - g_thread_state);
}
subtract_depth = run_closures(&exec_ctx, exec); subtract_depth = run_closures(&exec_ctx, exec);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
@ -159,6 +184,9 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) { grpc_error *error) {
size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
if (cur_thread_count == 0) { if (cur_thread_count == 0) {
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
}
grpc_closure_list_append(&exec_ctx->closure_list, closure, error); grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
return; return;
} }
@ -166,6 +194,10 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
if (ts == NULL) { if (ts == NULL) {
ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} }
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p to thread %" PRIdPTR, closure,
ts - g_thread_state);
}
gpr_mu_lock(&ts->mu); gpr_mu_lock(&ts->mu);
if (grpc_closure_list_empty(ts->elems)) { if (grpc_closure_list_empty(ts->elems)) {
gpr_cv_signal(&ts->cv); gpr_cv_signal(&ts->cv);

@ -1172,6 +1172,9 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
} }
static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
gpr_log(GPR_DEBUG, "finish_batch_step: tag=%p steps=%" PRIdPTR,
bctl->completion_data.notify_tag.tag,
gpr_atm_no_barrier_load(&bctl->steps_to_complete.count));
if (gpr_unref(&bctl->steps_to_complete)) { if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl); post_batch_completion(exec_ctx, bctl);
} }

@ -111,10 +111,10 @@ void resource_quota_server(grpc_end2end_test_config config) {
grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024); grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024);
#define NUM_CALLS 100 #define NUM_CALLS 100
#define CLIENT_BASE_TAG 1000 #define CLIENT_BASE_TAG 0x1000
#define SERVER_START_BASE_TAG 2000 #define SERVER_START_BASE_TAG 0x2000
#define SERVER_RECV_BASE_TAG 3000 #define SERVER_RECV_BASE_TAG 0x3000
#define SERVER_END_BASE_TAG 4000 #define SERVER_END_BASE_TAG 0x4000
grpc_arg arg; grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA; arg.key = GRPC_ARG_RESOURCE_QUOTA;

Loading…
Cancel
Save