Adding more comments

pull/21904/head
Lidi Zheng 5 years ago
parent 72514e9bf4
commit f89448a9d5
  1. 21
      src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py

@ -13,13 +13,13 @@
# limitations under the License.
import asyncio
import collections
import logging
import os
import multiprocessing
import os
import sys
import time
from typing import Tuple
import collections
import grpc
from grpc.experimental import aio
@ -28,11 +28,10 @@ from src.proto.grpc.testing import (benchmark_service_pb2_grpc, control_pb2,
stats_pb2, worker_service_pb2_grpc)
from tests.qps import histogram
from tests.unit import resources
from tests_aio.benchmark import benchmark_client, benchmark_servicer
from tests.unit.framework.common import get_socket
from tests_aio.benchmark import benchmark_client, benchmark_servicer
_NUM_CORES = multiprocessing.cpu_count()
_NUM_CORE_PYTHON_CAN_USE = 1
_WORKER_ENTRY_FILE = os.path.split(os.path.abspath(__file__))[0] + '/worker.py'
_LOGGER = logging.getLogger(__name__)
@ -41,6 +40,7 @@ _LOGGER = logging.getLogger(__name__)
class _SubWorker(
collections.namedtuple('_SubWorker',
['process', 'port', 'channel', 'stub'])):
"""A data class that holds information about a child qps worker."""
def _repr(self):
return f'<_SubWorker pid={self.process.pid} port={self.port}>'
@ -54,6 +54,7 @@ class _SubWorker(
def _get_server_status(start_time: float, end_time: float,
port: int) -> control_pb2.ServerStatus:
"""Creates ServerStatus proto message."""
end_time = time.time()
elapsed_time = end_time - start_time
stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
@ -63,6 +64,7 @@ def _get_server_status(start_time: float, end_time: float,
def _create_server(config: control_pb2.ServerConfig) -> Tuple[aio.Server, int]:
"""Creates a server object according to the ServerConfig."""
channel_args = tuple(
(arg.name,
arg.str_value) if arg.HasField('str_value') else (arg.name,
@ -104,6 +106,7 @@ def _create_server(config: control_pb2.ServerConfig) -> Tuple[aio.Server, int]:
def _get_client_status(start_time: float, end_time: float,
qps_data: histogram.Histogram
) -> control_pb2.ClientStatus:
"""Creates ClientStatus proto message."""
latencies = qps_data.get_data()
end_time = time.time()
elapsed_time = end_time - start_time
@ -117,6 +120,7 @@ def _get_client_status(start_time: float, end_time: float,
def _create_client(server: str, config: control_pb2.ClientConfig,
qps_data: histogram.Histogram
) -> benchmark_client.BenchmarkClient:
"""Creates a client object according to the ClientConfig."""
if config.load_params.WhichOneof('load') != 'closed_loop':
raise NotImplementedError(
f'Unsupported load parameter {config.load_params}')
@ -137,12 +141,14 @@ def _create_client(server: str, config: control_pb2.ClientConfig,
def _pick_an_unused_port() -> int:
"""Picks an unused TCP port."""
_, port, sock = get_socket()
sock.close()
return port
async def _create_sub_worker() -> _SubWorker:
"""Creates a child qps worker as a subprocess."""
port = _pick_an_unused_port()
_LOGGER.info('Creating sub worker at port [%d]...', port)
@ -202,8 +208,10 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
_LOGGER.info('Port picked [%d]', config.port)
if config.async_server_threads == 1:
# If async_server_threads == 1, start the server in this process.
await self._run_single_server(config, request_iterator, context)
else:
# If async_server_threads > 1, offload to other processes.
sub_workers = await asyncio.gather(*(
_create_sub_worker()
for _ in range(config.async_server_threads)))
@ -225,11 +233,12 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
config.port,
))
_LOGGER.info('Servers are ready to serve.')
async for request in request_iterator:
end_time = time.time()
for call in calls:
_LOGGER.debug('Fetching status...')
await call.write(request)
# Reports from sub workers doesn't matter
await call.read()
@ -293,8 +302,10 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
config.async_client_threads = _NUM_CORES
if config.async_client_threads == 1:
# If async_client_threads == 1, run the benchmark in this process.
await self._run_single_client(config, request_iterator, context)
else:
# If async_client_threads > 1, offload the work to other processes.
sub_workers = await asyncio.gather(*(
_create_sub_worker()
for _ in range(config.async_client_threads)))

Loading…
Cancel
Save