reviewable/pr8008/r2
Craig Tiller 9 years ago
parent 86037cd0e7
commit 2c4043bd8a
  1. 26
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 4
      src/core/ext/transport/chttp2/transport/writing.c
  3. 35
      src/core/lib/surface/completion_queue.c

@ -538,6 +538,20 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
* OUTPUT PROCESSING
*/
static const char *write_state_name(grpc_chttp2_write_state st) {
switch (st) {
case GRPC_CHTTP2_WRITE_STATE_IDLE: return "IDLE";
case GRPC_CHTTP2_WRITE_STATE_WRITING: return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: return "WRITING+MORE";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
static void set_write_state(grpc_chttp2_transport *t, grpc_chttp2_write_state st) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s", t, t->is_client ? "CLIENT" : "SERVER", write_state_name(t->write_state), write_state_name(st)));
t->write_state = st;
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
bool covered_by_poller, const char *reason) {
@ -545,14 +559,14 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME;
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
break;
@ -576,10 +590,10 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING);
grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
} else {
t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("write_action_begin_locked", 0);
@ -617,11 +631,11 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITE_STATE_WRITING:
GPR_TIMER_MARK("state=writing", 0);
t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,

@ -109,6 +109,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
bool sent_initial_metadata = s->sent_initial_metadata;
bool now_writing = false;
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, t->is_client?"CLIENT":"SERVER", s->id, sent_initial_metadata, s->send_initial_metadata!=NULL, s->announce_window));
/* send initial metadata if it's available */
if (!sent_initial_metadata && s->send_initial_metadata) {
grpc_chttp2_encode_header(&t->hpack_compressor, s->id,
@ -120,7 +122,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
now_writing = true;
}
/* send any window updates */
if (s->announce_window > 0 && sent_initial_metadata) {
if (s->announce_window > 0) {
uint32_t announce = s->announce_window;
gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(

@ -319,6 +319,7 @@ typedef struct {
gpr_timespec deadline;
grpc_cq_completion *stolen_completion;
void *tag; /* for pluck */
bool first_loop;
} cq_is_finished_arg;
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
@ -342,7 +343,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
#ifndef NDEBUG
@ -370,7 +372,6 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker *worker = NULL;
int first_loop = 1;
gpr_timespec now;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@ -392,8 +393,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
NULL};
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cc->things_queued_ever),
.cq = cc,
.deadline = deadline,
.stolen_completion = NULL,
.tag = NULL,
.first_loop = true};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
for (;;) {
@ -427,14 +433,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cc);
break;
}
first_loop = 0;
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
@ -461,6 +466,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
}
is_finished_arg.first_loop = false;
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
@ -523,7 +529,8 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
@ -533,7 +540,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
int first_loop = 1;
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@ -556,8 +562,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
tag};
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cc->things_queued_ever),
.cq = cc,
.deadline = deadline,
.stolen_completion = NULL,
.tag = tag,
.first_loop = true};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
for (;;) {
@ -607,7 +618,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
@ -615,7 +626,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
dump_pending_tags(cc);
break;
}
first_loop = 0;
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
@ -642,6 +652,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
}
is_finished_arg.first_loop = false;
del_plucker(cc, tag, &worker);
}
done:

Loading…
Cancel
Save