From 2d81bd203f0fd19b8d676942a7e8460b659769ce Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 5 Feb 2020 17:42:55 -0800 Subject: [PATCH] Introduce sub workers --- .../grpcio_tests/tests/qps/histogram.py | 10 +++ .../tests_aio/benchmark/worker_servicer.py | 86 +++++++++++++++++-- .../run_tests/performance/scenario_config.py | 4 +- 3 files changed, 91 insertions(+), 9 deletions(-) diff --git a/src/python/grpcio_tests/tests/qps/histogram.py b/src/python/grpcio_tests/tests/qps/histogram.py index f198630b7d9..e0f06ffa9ec 100644 --- a/src/python/grpcio_tests/tests/qps/histogram.py +++ b/src/python/grpcio_tests/tests/qps/histogram.py @@ -65,6 +65,16 @@ class Histogram(object): data.count = self._count return data + def merge(self, another_data): + with self._lock: + for i in len(self._buckets): + self._buckets[i] += another_data.bucket[i] + self._min = min(self._min, another_data.min_seen) + self._max = max(self._max, another_data.max_seen) + self._sum += another_data.sum + self._sum_of_squares += another_data.sum_of_squares + self._count += another_data.count + def _bucket_for(self, val): val = min(val, self._max_possible) return int(math.log(val, self.multiplier)) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py b/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py index cefe7c1834e..f6bafd35703 100644 --- a/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py +++ b/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py @@ -14,9 +14,12 @@ import asyncio import logging +import os import multiprocessing +import sys import time from typing import Tuple +import collections import grpc from grpc.experimental import aio @@ -117,6 +120,26 @@ def _create_client(server: str, config: control_pb2.ClientConfig, return client_type(server, config, qps_data) +WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + 'worker.py' +SubWorker = collections.namedtuple('SubWorker', ['process', 'port', 'channel', 'stub']) + + +async def _create_sub_worker(port: int) -> SubWorker: + process = asyncio.create_subprocess_exec( + sys.executable, + WORKER_ENTRY_FILE, + '--driver_port', port + ) + channel = aio.insecure_channel(f'localhost:{port}') + stub = worker_service_pb2_grpc.WorkerServiceStub(channel) + return SubWorker( + process=process, + port=port, + channel=channel, + stub=stub, + ) + + class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): """Python Worker Server implementation.""" @@ -143,10 +166,7 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): yield status await server.stop(None) - async def RunClient(self, request_iterator, context): - config = (await context.read()).setup - _LOGGER.info('Received ClientConfig: %s', config) - + async def _run_single_client(self, config, request_iterator, context): running_tasks = [] qps_data = histogram.Histogram(config.histogram_params.resolution, config.histogram_params.max_possible) @@ -160,7 +180,7 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): running_tasks.append(self._loop.create_task(client.run())) end_time = time.time() - yield _get_client_status(start_time, end_time, qps_data) + await context.write(_get_client_status(start_time, end_time, qps_data)) # Respond to stat requests async for request in request_iterator: @@ -169,16 +189,66 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): if request.mark.reset: qps_data.reset() start_time = time.time() - yield status + await context.write(status) # Cleanup the clients for task in running_tasks: task.cancel() - async def CoreCount(self, request, context): + async def RunClient(self, request_iterator, context): + config_request = await context.read() + config = config_request.setup + _LOGGER.info('Received ClientConfig: %s', config) + + if config.async_server_threads <= 0: + raise ValueError('async_server_threads can\'t be [%d]' % config.async_server_threads) + elif config.async_server_threads == 1: + await self._run_single_client(config, request_iterator, context) + else: + sub_workers = [] + for i in range(config.async_server_threads): + port = 40000+i + _LOGGER.info('Creating sub worker at port [%d]...', port) + sub_workers.append(await _create_sub_worker(port)) + + calls = [worker.stub.RunClient() for worker in sub_workers] + + for call in calls: + await call.write(config_request) + + start_time = time.time() + result = histogram.Histogram(config.histogram_params.resolution, + config.histogram_params.max_possible) + end_time = time.time() + yield _get_client_status(start_time, end_time, result) + + async for request in request_iterator: + end_time = time.time() + + for call in calls: + await call.write(request) + sub_status = await call.read() + result.merge(sub_status.latencies) + + status = _get_client_status(start_time, end_time, result) + if request.mark.reset: + result.reset() + start_time = time.time() + yield status + + for call in calls: + await call.QuitWorker() + + for worker in sub_workers: + await worker.channel.close() + _LOGGER.info('Waiting for sub worker [%s] to quit...', worker) + await worker.process.wait() + _LOGGER.info('Sub worker [%s] quit', worker) + + async def CoreCount(self, unused_request, unused_context): return control_pb2.CoreResponse(cores=_NUM_CORES) - async def QuitWorker(self, request, context): + async def QuitWorker(self, unused_request, unused_context): _LOGGER.info('QuitWorker command received.') self._quit_event.set() return control_pb2.Void() diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index d1e8ebbd148..5a4fddf16e6 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -118,6 +118,7 @@ def _ping_pong_scenario(name, unconstrained_client=None, client_language=None, server_language=None, + async_client_threads=0, async_server_threads=0, server_threads_per_cq=0, client_threads_per_cq=0, @@ -187,7 +188,7 @@ def _ping_pong_scenario(name, 'num_clients'] = num_clients if num_clients is not None else 0 # use as many clients as available. scenario['client_config']['outstanding_rpcs_per_channel'] = deep scenario['client_config']['client_channels'] = wide - scenario['client_config']['async_client_threads'] = 0 + scenario['client_config']['async_client_threads'] = async_client_threads if offered_load is not None: optimization_target = 'latency' else: @@ -837,6 +838,7 @@ class PythonAsyncIOLanguage: channels=1, client_threads_per_cq=1, server_threads_per_cq=1, + async_server_threads=6, unconstrained_client=True, categories=[SMOKETEST, SCALABLE])