diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index ec4a5d166b8..f1c457c0274 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -287,6 +287,14 @@ typedef struct { /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ "grpc.workaround.cronet_compression" +/** String defining the optimization target for a channel. + Can be: "latency" - attempt to minimize latency at the cost of throughput + "blend" - try to balance latency and throughput + "throughput" - attempt to maximize throughput at the expense of + latency + Defaults to "blend". In the current implementation "blend" is equivalent to + "latency". */ +#define GRPC_ARG_OPTIMIZATION_TARGET "grpc.optimization_target" /** \} */ /** Result of a grpc call. If the caller satisfies the prerequisites of a diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0ad63d1af2f..3b3782c1ec9 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -34,6 +34,7 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -269,8 +270,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - GRPC_CLOSURE_INIT(&t->write_action, write_action, t, - grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -387,6 +386,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -475,6 +476,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_permit_without_calls = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){0, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } } else { static const struct { const char *channel_arg_name; @@ -528,6 +546,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + GRPC_CLOSURE_INIT(&t->write_action, write_action, t, + t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT + ? grpc_executor_scheduler + : grpc_schedule_on_exec_ctx); + t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 4041b29fecb..4209d66c988 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -66,6 +66,11 @@ typedef enum { GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */ } grpc_chttp2_ping_type; +typedef enum { + GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, + GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, +} grpc_chttp2_optimization_target; + typedef enum { GRPC_CHTTP2_PCL_INITIATE = 0, GRPC_CHTTP2_PCL_NEXT, @@ -229,6 +234,8 @@ struct grpc_chttp2_transport { /** should we probe bdp? */ bool enable_bdp_probe; + grpc_chttp2_optimization_target opt_target; + /** various lists of streams */ grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 7bd6a3aa746..2177229a983 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -120,12 +120,14 @@ def _ping_pong_scenario(name, rpc_type, 'closed_loop': {} }, 'histogram_params': HISTOGRAM_PARAMS, + 'channel_args': [], }, 'server_config': { 'server_type': server_type, 'security_params': _get_secargs(secure), 'async_server_threads': async_server_threads, 'threads_per_cq': server_threads_per_cq, + 'channel_args': [], }, 'warmup_seconds': warmup_seconds, 'benchmark_seconds': BENCHMARK_SECONDS @@ -139,6 +141,8 @@ def _ping_pong_scenario(name, rpc_type, scenario['client_config']['payload_config'] = _payload_type(use_generic_payload, req_size, resp_size) + optimization_target = 'blend' + if unconstrained_client: outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[unconstrained_client] # clamp buffer usage to something reasonable (16 gig for now) @@ -152,10 +156,19 @@ def _ping_pong_scenario(name, rpc_type, scenario['client_config']['outstanding_rpcs_per_channel'] = deep scenario['client_config']['client_channels'] = wide scenario['client_config']['async_client_threads'] = 0 + optimization_target = 'throughput' else: scenario['client_config']['outstanding_rpcs_per_channel'] = 1 scenario['client_config']['client_channels'] = 1 scenario['client_config']['async_client_threads'] = 1 + optimization_target = 'latency' + + optimization_channel_arg = { + 'name': 'grpc.optimization_target', + 'str_value': optimization_target + } + scenario['client_config']['channel_args'].append(optimization_channel_arg) + scenario['server_config']['channel_args'].append(optimization_channel_arg) if messages_per_stream: scenario['client_config']['messages_per_stream'] = messages_per_stream