From 89fa7203d039ae969578ab4b01377a1d444d7224 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 10 Nov 2020 12:27:47 -0800 Subject: [PATCH] Add a special optimization for 1 rpc unary-stream sync --- .../tests/qps/benchmark_client.py | 20 +++++++++++++++---- .../grpcio_tests/tests/qps/client_runner.py | 11 ++++++---- .../grpcio_tests/tests/qps/worker_server.py | 4 +++- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 461c71499e7..29cd35efb17 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -209,12 +209,21 @@ class ServerStreamingSyncBenchmarkClient(BenchmarkClient): def __init__(self, server, config, hist): super(ServerStreamingSyncBenchmarkClient, self).__init__(server, config, hist) - self._pool = futures.ThreadPoolExecutor( - max_workers=config.outstanding_rpcs_per_channel) + if config.outstanding_rpcs_per_channel == 1: + self._pool = None + else: + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) self._rpcs = [] + self._sender = None def send_request(self): - self._pool.submit(self._one_stream_streaming_rpc) + if self._pool is None: + self._sender = threading.Thread( + target=self._one_stream_streaming_rpc, daemon=True) + self._sender.start() + else: + self._pool.submit(self._one_stream_streaming_rpc) def _one_stream_streaming_rpc(self): response_stream = self._stub.StreamingFromServer( @@ -228,5 +237,8 @@ class ServerStreamingSyncBenchmarkClient(BenchmarkClient): def stop(self): for call in self._rpcs: call.cancel() - self._pool.shutdown(wait=False) + if self._sender is not None: + self._sender.join() + if self._pool is not None: + self._pool.shutdown(wait=False) self._stub = None diff --git a/src/python/grpcio_tests/tests/qps/client_runner.py b/src/python/grpcio_tests/tests/qps/client_runner.py index c5d299f6463..a03174472c0 100644 --- a/src/python/grpcio_tests/tests/qps/client_runner.py +++ b/src/python/grpcio_tests/tests/qps/client_runner.py @@ -67,12 +67,15 @@ class OpenLoopClientRunner(ClientRunner): class ClosedLoopClientRunner(ClientRunner): - def __init__(self, client, request_count): + def __init__(self, client, request_count, no_ping_pong): super(ClosedLoopClientRunner, self).__init__(client) self._is_running = False self._request_count = request_count - # Send a new request on each response for closed loop - self._client.add_response_callback(self._send_request) + # For server-streaming RPC, don't spawn new RPC after each responses. + # This yield at most ~17% for single RPC scenarios. + if not no_ping_pong: + # Send a new request on each response for closed loop + self._client.add_response_callback(self._send_request) def start(self): self._is_running = True @@ -85,6 +88,6 @@ class ClosedLoopClientRunner(ClientRunner): self._client.stop() self._client = None - def _send_request(self, client, response_time): + def _send_request(self, client, unused_response_time): if self._is_running: client.send_request() diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 5132dd8fa5b..327b8e3b4c0 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -148,6 +148,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): return control_pb2.ClientStatus(stats=stats) def _create_client_runner(self, server, config, qps_data): + no_ping_pong = False if config.client_type == control_pb2.SYNC_CLIENT: if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnarySyncBenchmarkClient( @@ -156,6 +157,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): client = benchmark_client.StreamingSyncBenchmarkClient( server, config, qps_data) elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER: + no_ping_pong = True client = benchmark_client.ServerStreamingSyncBenchmarkClient( server, config, qps_data) elif config.client_type == control_pb2.ASYNC_CLIENT: @@ -172,7 +174,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): load_factor = float(config.client_channels) if config.load_params.WhichOneof('load') == 'closed_loop': runner = client_runner.ClosedLoopClientRunner( - client, config.outstanding_rpcs_per_channel) + client, config.outstanding_rpcs_per_channel, no_ping_pong) else: # Open loop Poisson alpha = config.load_params.poisson.offered_load / load_factor