Fix the sub worker

pull/21904/head
Lidi Zheng 5 years ago
parent 2d81bd203f
commit 5e2114d92f
  1. 2
      src/python/grpcio_tests/tests/qps/histogram.py
  2. 33
      src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py
  3. 2
      tools/run_tests/performance/scenario_config.py

@ -67,7 +67,7 @@ class Histogram(object):
def merge(self, another_data):
with self._lock:
for i in len(self._buckets):
for i in range(len(self._buckets)):
self._buckets[i] += another_data.bucket[i]
self._min = min(self._min, another_data.min_seen)
self._max = max(self._max, another_data.max_seen)

@ -120,17 +120,20 @@ def _create_client(server: str, config: control_pb2.ClientConfig,
return client_type(server, config, qps_data)
WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + 'worker.py'
WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + '/worker.py'
SubWorker = collections.namedtuple('SubWorker', ['process', 'port', 'channel', 'stub'])
async def _create_sub_worker(port: int) -> SubWorker:
process = asyncio.create_subprocess_exec(
process = await asyncio.create_subprocess_exec(
sys.executable,
WORKER_ENTRY_FILE,
'--driver_port', port
'--driver_port', str(port)
)
_LOGGER.info('Created sub worker process for port [%d] at pid [%d]', port, process.pid)
channel = aio.insecure_channel(f'localhost:{port}')
_LOGGER.info('Waiting for sub worker at port [%d]', port)
await aio.channel_ready(channel)
stub = worker_service_pb2_grpc.WorkerServiceStub(channel)
return SubWorker(
process=process,
@ -200,46 +203,54 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
config = config_request.setup
_LOGGER.info('Received ClientConfig: %s', config)
if config.async_server_threads <= 0:
raise ValueError('async_server_threads can\'t be [%d]' % config.async_server_threads)
elif config.async_server_threads == 1:
if config.async_client_threads <= 0:
raise ValueError('async_client_threads can\'t be [%d]' % config.async_client_threads)
elif config.async_client_threads == 1:
await self._run_single_client(config, request_iterator, context)
else:
sub_workers = []
for i in range(config.async_server_threads):
for i in range(config.async_client_threads):
port = 40000+i
_LOGGER.info('Creating sub worker at port [%d]...', port)
sub_workers.append(await _create_sub_worker(port))
calls = [worker.stub.RunClient() for worker in sub_workers]
config_request.setup.async_client_threads = 1
for call in calls:
await call.write(config_request)
# An empty status
await call.read()
start_time = time.time()
result = histogram.Histogram(config.histogram_params.resolution,
config.histogram_params.max_possible)
end_time = time.time()
yield _get_client_status(start_time, end_time, result)
await context.write(_get_client_status(start_time, end_time, result))
async for request in request_iterator:
end_time = time.time()
for call in calls:
_LOGGER.debug('Fetching status...')
await call.write(request)
sub_status = await call.read()
result.merge(sub_status.latencies)
result.merge(sub_status.stats.latencies)
_LOGGER.debug('Update from sub worker count=[%d]', sub_status.stats.latencies.count)
status = _get_client_status(start_time, end_time, result)
if request.mark.reset:
result.reset()
start_time = time.time()
yield status
_LOGGER.debug('Reporting count=[%d]', status.stats.latencies.count)
await context.write(status)
for call in calls:
await call.QuitWorker()
await call.done_writing()
for worker in sub_workers:
await worker.stub.QuitWorker(control_pb2.Void())
await worker.channel.close()
_LOGGER.info('Waiting for sub worker [%s] to quit...', worker)
await worker.process.wait()

@ -838,7 +838,7 @@ class PythonAsyncIOLanguage:
channels=1,
client_threads_per_cq=1,
server_threads_per_cq=1,
async_server_threads=6,
async_client_threads=7,
unconstrained_client=True,
categories=[SMOKETEST, SCALABLE])

Loading…
Cancel
Save