Improve benchmark automations:

1. Use monotonic time;
2. Improve readabilities;
3. Add two more fields for number of processes.
pull/21904/head
Lidi Zheng 5 years ago
parent 50a7c863dd
commit c22b161d4e
  1. 6
      src/proto/grpc/testing/control.proto
  2. 5
      src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py
  3. 70
      src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py
  4. 40
      tools/run_tests/performance/scenario_config.py

@ -117,6 +117,9 @@ message ClientConfig {
// If 0, disabled. Else, specifies the period between gathering latency
// medians in milliseconds.
int32 median_latency_collection_interval_millis = 20;
// Number of client processes. 0 indicates no restriction.
int32 client_processes = 21;
}
message ClientStatus { ClientStats stats = 1; }
@ -163,6 +166,9 @@ message ServerConfig {
// Buffer pool size (no buffer pool specified if unset)
int32 resource_quota_size = 1001;
repeated ChannelArg channel_args = 1002;
// Number of server processes. 0 indicates no restriction.
int32 server_processes = 21;
}
message ServerArgs {

@ -104,9 +104,9 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
self._stopped = asyncio.Event()
async def _send_request(self):
start_time = time.time()
start_time = time.monotonic()
await self._stub.UnaryCall(self._request)
self._record_query_time(time.time() - start_time)
self._record_query_time(time.monotonic() - start_time)
async def _infinite_sender(self) -> None:
while self._running:
@ -141,7 +141,6 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
await call.read()
self._record_query_time(time.time() - start_time)
await call.done_writing()
assert grpc.StatusCode.OK == await call.code()
async def run(self):
await super().run()

@ -55,8 +55,9 @@ class _SubWorker(
def _get_server_status(start_time: float, end_time: float,
port: int) -> control_pb2.ServerStatus:
"""Creates ServerStatus proto message."""
end_time = time.time()
end_time = time.monotonic()
elapsed_time = end_time - start_time
# TODO(lidiz) Collect accurate time system to compute QPS/core-second.
stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
time_user=elapsed_time,
time_system=elapsed_time)
@ -108,8 +109,9 @@ def _get_client_status(start_time: float, end_time: float,
) -> control_pb2.ClientStatus:
"""Creates ClientStatus proto message."""
latencies = qps_data.get_data()
end_time = time.time()
end_time = time.monotonic()
elapsed_time = end_time - start_time
# TODO(lidiz) Collect accurate time system to compute QPS/core-second.
stats = stats_pb2.ClientStats(latencies=latencies,
time_elapsed=elapsed_time,
time_user=elapsed_time,
@ -181,11 +183,11 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
await server.start()
_LOGGER.info('Server started at port [%d]', port)
start_time = time.time()
start_time = time.monotonic()
await context.write(_get_server_status(start_time, start_time, port))
async for request in request_iterator:
end_time = time.time()
end_time = time.monotonic()
status = _get_server_status(start_time, end_time, port)
if request.mark.reset:
start_time = end_time
@ -197,35 +199,32 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
config = config_request.setup
_LOGGER.info('Received ServerConfig: %s', config)
if config.async_server_threads <= 0:
_LOGGER.info('async_server_threads can\'t be [%d]',
config.async_server_threads)
_LOGGER.info('Using async_server_threads == [%d]', _NUM_CORES)
config.async_server_threads = _NUM_CORES
if config.server_processes <= 0:
_LOGGER.info('Using server_processes == [%d]', _NUM_CORES)
config.server_processes = _NUM_CORES
if config.port == 0:
config.port = _pick_an_unused_port()
_LOGGER.info('Port picked [%d]', config.port)
if config.async_server_threads == 1:
# If async_server_threads == 1, start the server in this process.
if config.server_processes == 1:
# If server_processes == 1, start the server in this process.
await self._run_single_server(config, request_iterator, context)
else:
# If async_server_threads > 1, offload to other processes.
# If server_processes > 1, offload to other processes.
sub_workers = await asyncio.gather(*(
_create_sub_worker()
for _ in range(config.async_server_threads)))
_create_sub_worker() for _ in range(config.server_processes)))
calls = [worker.stub.RunServer() for worker in sub_workers]
config_request.setup.async_server_threads = 1
config_request.setup.server_processes = 1
for call in calls:
await call.write(config_request)
# An empty status indicates the peer is ready
await call.read()
start_time = time.time()
start_time = time.monotonic()
await context.write(
_get_server_status(
start_time,
@ -236,7 +235,7 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
_LOGGER.info('Servers are ready to serve.')
async for request in request_iterator:
end_time = time.time()
end_time = time.monotonic()
for call in calls:
await call.write(request)
@ -265,7 +264,7 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
running_tasks = []
qps_data = histogram.Histogram(config.histogram_params.resolution,
config.histogram_params.max_possible)
start_time = time.time()
start_time = time.monotonic()
# Create a client for each channel as asyncio.Task
for i in range(config.client_channels):
@ -274,16 +273,16 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
_LOGGER.info('Client created against server [%s]', server)
running_tasks.append(self._loop.create_task(client.run()))
end_time = time.time()
end_time = time.monotonic()
await context.write(_get_client_status(start_time, end_time, qps_data))
# Respond to stat requests
async for request in request_iterator:
end_time = time.time()
end_time = time.monotonic()
status = _get_client_status(start_time, end_time, qps_data)
if request.mark.reset:
qps_data.reset()
start_time = time.time()
start_time = time.monotonic()
await context.write(status)
# Cleanup the clients
@ -295,39 +294,38 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
config = config_request.setup
_LOGGER.info('Received ClientConfig: %s', config)
if config.async_client_threads <= 0:
_LOGGER.info('async_client_threads can\'t be [%d]',
config.async_client_threads)
_LOGGER.info('Using async_client_threads == [%d]', _NUM_CORES)
config.async_client_threads = _NUM_CORES
if config.client_processes <= 0:
_LOGGER.info('client_processes can\'t be [%d]',
config.client_processes)
_LOGGER.info('Using client_processes == [%d]', _NUM_CORES)
config.client_processes = _NUM_CORES
if config.async_client_threads == 1:
# If async_client_threads == 1, run the benchmark in this process.
if config.client_processes == 1:
# If client_processes == 1, run the benchmark in this process.
await self._run_single_client(config, request_iterator, context)
else:
# If async_client_threads > 1, offload the work to other processes.
# If client_processes > 1, offload the work to other processes.
sub_workers = await asyncio.gather(*(
_create_sub_worker()
for _ in range(config.async_client_threads)))
_create_sub_worker() for _ in range(config.client_processes)))
calls = [worker.stub.RunClient() for worker in sub_workers]
config_request.setup.async_client_threads = 1
config_request.setup.client_processes = 1
for call in calls:
await call.write(config_request)
# An empty status indicates the peer is ready
await call.read()
start_time = time.time()
start_time = time.monotonic()
result = histogram.Histogram(config.histogram_params.resolution,
config.histogram_params.max_possible)
end_time = time.time()
end_time = time.monotonic()
await context.write(_get_client_status(start_time, end_time,
result))
async for request in request_iterator:
end_time = time.time()
end_time = time.monotonic()
for call in calls:
_LOGGER.debug('Fetching status...')
@ -340,7 +338,7 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
status = _get_client_status(start_time, end_time, result)
if request.mark.reset:
result.reset()
start_time = time.time()
start_time = time.monotonic()
_LOGGER.debug('Reporting count=[%d]',
status.stats.latencies.count)
await context.write(status)

@ -118,8 +118,9 @@ def _ping_pong_scenario(name,
unconstrained_client=None,
client_language=None,
server_language=None,
async_client_threads=0,
async_server_threads=0,
client_processes=0,
server_processes=0,
server_threads_per_cq=0,
client_threads_per_cq=0,
warmup_seconds=WARMUP_SECONDS,
@ -143,6 +144,7 @@ def _ping_pong_scenario(name,
'outstanding_rpcs_per_channel': 1,
'client_channels': 1,
'async_client_threads': 1,
'client_processes': client_processes,
'threads_per_cq': client_threads_per_cq,
'rpc_type': rpc_type,
'histogram_params': HISTOGRAM_PARAMS,
@ -152,6 +154,7 @@ def _ping_pong_scenario(name,
'server_type': server_type,
'security_params': _get_secargs(secure),
'async_server_threads': async_server_threads,
'server_processes': server_processes,
'threads_per_cq': server_threads_per_cq,
'channel_args': [],
},
@ -188,7 +191,7 @@ def _ping_pong_scenario(name,
'num_clients'] = num_clients if num_clients is not None else 0 # use as many clients as available.
scenario['client_config']['outstanding_rpcs_per_channel'] = deep
scenario['client_config']['client_channels'] = wide
scenario['client_config']['async_client_threads'] = async_client_threads
scenario['client_config']['async_client_threads'] = 0
if offered_load is not None:
optimization_target = 'latency'
else:
@ -220,7 +223,7 @@ def _ping_pong_scenario(name,
scenario['SERVER_LANGUAGE'] = server_language
if categories:
scenario['CATEGORIES'] = categories
if len(excluded_poll_engines):
if excluded_poll_engines:
# The polling engines for which this scenario is excluded
scenario['EXCLUDED_POLL_ENGINES'] = excluded_poll_engines
return scenario
@ -794,7 +797,7 @@ class PythonLanguage:
client_type='SYNC_CLIENT',
server_type='ASYNC_SERVER',
server_language='c++',
async_server_threads=1,
async_server_threads=0,
categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
@ -842,6 +845,8 @@ class PythonAsyncIOLanguage:
server_type='ASYNC_SERVER',
outstanding=outstanding * channels,
channels=channels,
client_processes=0,
server_processes=0,
unconstrained_client='async',
categories=[SCALABLE])
@ -853,8 +858,8 @@ class PythonAsyncIOLanguage:
server_type='ASYNC_SERVER',
outstanding=outstanding,
channels=1,
async_client_threads=1,
async_server_threads=1,
client_processes=1,
server_processes=1,
unconstrained_client='async',
categories=[SCALABLE])
@ -863,7 +868,8 @@ class PythonAsyncIOLanguage:
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
async_server_threads=1,
client_processes=1,
server_processes=1,
use_generic_payload=True,
categories=[SMOKETEST, SCALABLE])
@ -872,7 +878,8 @@ class PythonAsyncIOLanguage:
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
async_server_threads=1,
client_processes=1,
server_processes=1,
categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
@ -880,7 +887,8 @@ class PythonAsyncIOLanguage:
rpc_type='UNARY',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
async_server_threads=1,
client_processes=1,
server_processes=1,
categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
@ -888,7 +896,8 @@ class PythonAsyncIOLanguage:
rpc_type='UNARY',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
async_server_threads=1,
client_processes=1,
server_processes=1,
categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
@ -911,6 +920,8 @@ class PythonAsyncIOLanguage:
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
server_language='c++',
client_processes=1,
unconstrained_client='async',
categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
@ -919,15 +930,18 @@ class PythonAsyncIOLanguage:
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
unconstrained_client='async',
async_client_threads=0,
client_processes=0,
server_language='c++',
categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
'python_asyncio_to_cpp_protobuf_sync_streaming_ping_pong',
'python_asyncio_to_cpp_protobuf_sync_streaming_ping_pong_1thread',
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
client_processes=1,
server_processes=1,
unconstrained_client='async',
server_language='c++')
yield _ping_pong_scenario(
@ -937,6 +951,8 @@ class PythonAsyncIOLanguage:
server_type='ASYNC_SERVER',
req_size=1024 * 1024,
resp_size=1024 * 1024,
client_processes=1,
server_processes=1,
categories=[SMOKETEST, SCALABLE])
def __str__(self):

Loading…
Cancel
Save