|
|
|
@ -29,6 +29,7 @@ from src.proto.grpc.testing import (benchmark_service_pb2_grpc, control_pb2, |
|
|
|
|
from tests.qps import histogram |
|
|
|
|
from tests.unit import resources |
|
|
|
|
from tests_aio.benchmark import benchmark_client, benchmark_servicer |
|
|
|
|
from tests.unit.framework.common import get_socket |
|
|
|
|
|
|
|
|
|
_NUM_CORES = multiprocessing.cpu_count() |
|
|
|
|
_NUM_CORE_PYTHON_CAN_USE = 1 |
|
|
|
@ -124,14 +125,17 @@ WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + '/worker.py' |
|
|
|
|
SubWorker = collections.namedtuple('SubWorker', ['process', 'port', 'channel', 'stub']) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _create_sub_worker(port: int) -> SubWorker: |
|
|
|
|
async def _create_sub_worker() -> SubWorker: |
|
|
|
|
address, port, sock = get_socket() |
|
|
|
|
sock.close() |
|
|
|
|
_LOGGER.info('Creating sub worker at port [%d]...', port) |
|
|
|
|
process = await asyncio.create_subprocess_exec( |
|
|
|
|
sys.executable, |
|
|
|
|
WORKER_ENTRY_FILE, |
|
|
|
|
'--driver_port', str(port) |
|
|
|
|
) |
|
|
|
|
_LOGGER.info('Created sub worker process for port [%d] at pid [%d]', port, process.pid) |
|
|
|
|
channel = aio.insecure_channel(f'localhost:{port}') |
|
|
|
|
channel = aio.insecure_channel(f'{address}:{port}') |
|
|
|
|
_LOGGER.info('Waiting for sub worker at port [%d]', port) |
|
|
|
|
await aio.channel_ready(channel) |
|
|
|
|
stub = worker_service_pb2_grpc.WorkerServiceStub(channel) |
|
|
|
@ -210,9 +214,7 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): |
|
|
|
|
else: |
|
|
|
|
sub_workers = [] |
|
|
|
|
for i in range(config.async_client_threads): |
|
|
|
|
port = 40000+i |
|
|
|
|
_LOGGER.info('Creating sub worker at port [%d]...', port) |
|
|
|
|
sub_workers.append(await _create_sub_worker(port)) |
|
|
|
|
sub_workers.append(await _create_sub_worker()) |
|
|
|
|
|
|
|
|
|
calls = [worker.stub.RunClient() for worker in sub_workers] |
|
|
|
|
|
|
|
|
@ -237,7 +239,8 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer): |
|
|
|
|
await call.write(request) |
|
|
|
|
sub_status = await call.read() |
|
|
|
|
result.merge(sub_status.stats.latencies) |
|
|
|
|
_LOGGER.debug('Update from sub worker count=[%d]', sub_status.stats.latencies.count) |
|
|
|
|
_LOGGER.debug('Update from sub worker count=[%d]', |
|
|
|
|
sub_status.stats.latencies.count) |
|
|
|
|
|
|
|
|
|
status = _get_client_status(start_time, end_time, result) |
|
|
|
|
if request.mark.reset: |
|
|
|
|