diff --git a/src/python/grpcio_tests/tests/qps/histogram.py b/src/python/grpcio_tests/tests/qps/histogram.py index e0f06ffa9ec..8139a6ee2fb 100644 --- a/src/python/grpcio_tests/tests/qps/histogram.py +++ b/src/python/grpcio_tests/tests/qps/histogram.py @@ -67,7 +67,7 @@ class Histogram(object): def merge(self, another_data): with self._lock: - for i in len(self._buckets): + for i in range(len(self._buckets)): self._buckets[i] += another_data.bucket[i] self._min = min(self._min, another_data.min_seen) self._max = max(self._max, another_data.max_seen) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py b/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py index f6bafd35703..ee72165019c 100644 --- a/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py +++ b/src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py @@ -120,17 +120,20 @@ def _create_client(server: str, config: control_pb2.ClientConfig, return client_type(server, config, qps_data) -WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + 'worker.py' +WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + '/worker.py' SubWorker = collections.namedtuple('SubWorker', ['process', 'port', 'channel', 'stub']) async def _create_sub_worker(port: int) -> SubWorker: - process = asyncio.create_subprocess_exec( + process = await asyncio.create_subprocess_exec( sys.executable, WORKER_ENTRY_FILE, - '--driver_port', port + '--driver_port', str(port) ) + _LOGGER.info('Created sub worker process for port [%d] at pid [%d]', port, process.pid) channel = aio.insecure_channel(f'localhost:{port}') + _LOGGER.info('Waiting for sub worker at port [%d]', port) + await aio.channel_ready(channel) stub = worker_service_pb2_grpc.WorkerServiceStub(channel) return SubWorker( process=process, @@ -200,46 +203,54 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): config = config_request.setup _LOGGER.info('Received ClientConfig: %s', config) - if config.async_server_threads <= 0: - raise ValueError('async_server_threads can\'t be [%d]' % config.async_server_threads) - elif config.async_server_threads == 1: + if config.async_client_threads <= 0: + raise ValueError('async_client_threads can\'t be [%d]' % config.async_client_threads) + elif config.async_client_threads == 1: await self._run_single_client(config, request_iterator, context) else: sub_workers = [] - for i in range(config.async_server_threads): + for i in range(config.async_client_threads): port = 40000+i _LOGGER.info('Creating sub worker at port [%d]...', port) sub_workers.append(await _create_sub_worker(port)) calls = [worker.stub.RunClient() for worker in sub_workers] + config_request.setup.async_client_threads = 1 + for call in calls: await call.write(config_request) + # An empty status + await call.read() start_time = time.time() result = histogram.Histogram(config.histogram_params.resolution, config.histogram_params.max_possible) end_time = time.time() - yield _get_client_status(start_time, end_time, result) + await context.write(_get_client_status(start_time, end_time, result)) async for request in request_iterator: end_time = time.time() for call in calls: + _LOGGER.debug('Fetching status...') await call.write(request) sub_status = await call.read() - result.merge(sub_status.latencies) + result.merge(sub_status.stats.latencies) + _LOGGER.debug('Update from sub worker count=[%d]', sub_status.stats.latencies.count) status = _get_client_status(start_time, end_time, result) if request.mark.reset: result.reset() start_time = time.time() - yield status + _LOGGER.debug('Reporting count=[%d]', status.stats.latencies.count) + await context.write(status) for call in calls: - await call.QuitWorker() + await call.done_writing() for worker in sub_workers: + await worker.stub.QuitWorker(control_pb2.Void()) await worker.channel.close() _LOGGER.info('Waiting for sub worker [%s] to quit...', worker) await worker.process.wait() diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 5a4fddf16e6..e935539605c 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -838,7 +838,7 @@ class PythonAsyncIOLanguage: channels=1, client_threads_per_cq=1, server_threads_per_cq=1, - async_server_threads=6, + async_client_threads=7, unconstrained_client=True, categories=[SMOKETEST, SCALABLE])