Merge pull request #20328 from soheilhy/h2-write-no-async

Remove out of context writes from H2.
pull/20353/head
Soheil Hassas Yeganeh 5 years ago committed by GitHub
commit e67b82c55d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 81
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 6
      src/core/ext/transport/chttp2/transport/internal.h
  3. 21
      test/cpp/qps/driver.cc

@ -308,21 +308,7 @@ static bool read_channel_args(grpc_chttp2_transport* t,
grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
} else if (0 == strcmp(channel_args->args[i].key, } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_OPTIMIZATION_TARGET)) { GRPC_ARG_OPTIMIZATION_TARGET)) {
if (channel_args->args[i].type != GRPC_ARG_STRING) { gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
gpr_log(GPR_ERROR, "%s should be a string",
GRPC_ARG_OPTIMIZATION_TARGET);
} else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
} else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
} else if (0 ==
strcmp(channel_args->args[i].value.string, "throughput")) {
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
} else {
gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
GRPC_ARG_OPTIMIZATION_TARGET,
channel_args->args[i].value.string);
}
} else if (0 == } else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) { strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
channelz_enabled = grpc_channel_arg_get_bool( channelz_enabled = grpc_channel_arg_get_bool(
@ -926,7 +912,6 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
inc_initiate_write_reason(reason); inc_initiate_write_reason(reason);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
grpc_chttp2_initiate_write_reason_string(reason)); grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
/* Note that the 'write_action_begin_locked' closure is being scheduled /* Note that the 'write_action_begin_locked' closure is being scheduled
* on the 'finally_scheduler' of t->combiner. This means that * on the 'finally_scheduler' of t->combiner. This means that
@ -967,50 +952,12 @@ void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
} }
} }
static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t, static const char* begin_writing_desc(bool partial) {
bool early_results_scheduled, if (partial) {
bool partial_write) {
// If we're already in a background poller, don't offload this to an executor
if (grpc_iomgr_is_any_background_poller_thread()) {
return grpc_schedule_on_exec_ctx;
}
/* if it's not the first write in a batch, always offload to the executor:
we'll probably end up queuing against the kernel anyway, so we'll likely
get better latency overall if we switch writing work elsewhere and continue
with application work above */
if (!t->is_first_write_in_batch) {
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
}
/* equivalently, if it's a partial write, we *know* we're going to be taking a
thread jump to write it because of the above, may as well do so
immediately */
if (partial_write) {
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
}
switch (t->opt_target) {
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
/* executor gives us the largest probability of being able to batch a
* write with others on this transport */
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
return grpc_schedule_on_exec_ctx;
}
GPR_UNREACHABLE_CODE(return nullptr);
}
#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
static const char* begin_writing_desc(bool partial, bool inlined) {
switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
case WRITE_STATE_TUPLE_TO_INT(false, false):
return "begin write in background";
case WRITE_STATE_TUPLE_TO_INT(false, true):
return "begin write in current thread";
case WRITE_STATE_TUPLE_TO_INT(true, false):
return "begin partial write in background"; return "begin partial write in background";
case WRITE_STATE_TUPLE_TO_INT(true, true): } else {
return "begin partial write in current thread"; return "begin write in current thread";
} }
GPR_UNREACHABLE_CODE(return "bad state tuple");
} }
static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
@ -1027,22 +974,11 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
if (r.partial) { if (r.partial) {
GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(); GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
} }
if (!t->is_first_write_in_batch) { set_write_state(t,
GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
}
grpc_closure_scheduler* scheduler =
write_scheduler(t, r.early_results_scheduled, r.partial);
if (scheduler != grpc_schedule_on_exec_ctx) {
GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
}
set_write_state(
t,
r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
: GRPC_CHTTP2_WRITE_STATE_WRITING, : GRPC_CHTTP2_WRITE_STATE_WRITING,
begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); begin_writing_desc(r.partial));
GRPC_CLOSURE_SCHED( write_action(t, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
GRPC_ERROR_NONE);
if (t->reading_paused_on_pending_induced_frames) { if (t->reading_paused_on_pending_induced_frames) {
GPR_ASSERT(t->num_pending_induced_frames == 0); GPR_ASSERT(t->num_pending_induced_frames == 0);
/* We had paused reading, because we had many induced frames (SETTINGS /* We had paused reading, because we had many induced frames (SETTINGS
@ -1106,7 +1042,6 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0); GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
// If the transport is closed, we will retry writing on the endpoint // If the transport is closed, we will retry writing on the endpoint
// and next write may contain part of the currently serialized frames. // and next write may contain part of the currently serialized frames.

@ -306,10 +306,6 @@ struct grpc_chttp2_transport {
/** write execution state of the transport */ /** write execution state of the transport */
grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
/** is this the first write in a series of writes?
set when we initiate writing from idle, cleared when we
initiate writing from writing+more */
bool is_first_write_in_batch = false;
/** is the transport destroying itself? */ /** is the transport destroying itself? */
uint8_t destroying = false; uint8_t destroying = false;
@ -319,8 +315,6 @@ struct grpc_chttp2_transport {
/** is there a read request to the endpoint outstanding? */ /** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading = 1; uint8_t endpoint_reading = 1;
grpc_chttp2_optimization_target opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
/** various lists of streams */ /** various lists of streams */
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {}; grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};

@ -463,6 +463,17 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i); gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
} }
} }
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
}
}
for (size_t i = 0; i < num_clients; i++) { for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i]; auto client = &clients[i];
// Read the client final status // Read the client final status
@ -499,16 +510,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
rrc->set_count(it->second); rrc->set_count(it->second);
} }
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
}
}
for (size_t i = 0; i < num_servers; i++) { for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i]; auto server = &servers[i];
// Read the server final status // Read the server final status

Loading…
Cancel
Save