Add a special optimization for 1 rpc unary-stream sync

pull/24717/head
Lidi Zheng 4 years ago
parent cc03415b78
commit 89fa7203d0
  1. 20
      src/python/grpcio_tests/tests/qps/benchmark_client.py
  2. 11
      src/python/grpcio_tests/tests/qps/client_runner.py
  3. 4
      src/python/grpcio_tests/tests/qps/worker_server.py

@ -209,12 +209,21 @@ class ServerStreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
super(ServerStreamingSyncBenchmarkClient,
self).__init__(server, config, hist)
self._pool = futures.ThreadPoolExecutor(
max_workers=config.outstanding_rpcs_per_channel)
if config.outstanding_rpcs_per_channel == 1:
self._pool = None
else:
self._pool = futures.ThreadPoolExecutor(
max_workers=config.outstanding_rpcs_per_channel)
self._rpcs = []
self._sender = None
def send_request(self):
self._pool.submit(self._one_stream_streaming_rpc)
if self._pool is None:
self._sender = threading.Thread(
target=self._one_stream_streaming_rpc, daemon=True)
self._sender.start()
else:
self._pool.submit(self._one_stream_streaming_rpc)
def _one_stream_streaming_rpc(self):
response_stream = self._stub.StreamingFromServer(
@ -228,5 +237,8 @@ class ServerStreamingSyncBenchmarkClient(BenchmarkClient):
def stop(self):
for call in self._rpcs:
call.cancel()
self._pool.shutdown(wait=False)
if self._sender is not None:
self._sender.join()
if self._pool is not None:
self._pool.shutdown(wait=False)
self._stub = None

@ -67,12 +67,15 @@ class OpenLoopClientRunner(ClientRunner):
class ClosedLoopClientRunner(ClientRunner):
def __init__(self, client, request_count):
def __init__(self, client, request_count, no_ping_pong):
super(ClosedLoopClientRunner, self).__init__(client)
self._is_running = False
self._request_count = request_count
# Send a new request on each response for closed loop
self._client.add_response_callback(self._send_request)
# For server-streaming RPC, don't spawn new RPC after each responses.
# This yield at most ~17% for single RPC scenarios.
if not no_ping_pong:
# Send a new request on each response for closed loop
self._client.add_response_callback(self._send_request)
def start(self):
self._is_running = True
@ -85,6 +88,6 @@ class ClosedLoopClientRunner(ClientRunner):
self._client.stop()
self._client = None
def _send_request(self, client, response_time):
def _send_request(self, client, unused_response_time):
if self._is_running:
client.send_request()

@ -148,6 +148,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
return control_pb2.ClientStatus(stats=stats)
def _create_client_runner(self, server, config, qps_data):
no_ping_pong = False
if config.client_type == control_pb2.SYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnarySyncBenchmarkClient(
@ -156,6 +157,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
client = benchmark_client.StreamingSyncBenchmarkClient(
server, config, qps_data)
elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
no_ping_pong = True
client = benchmark_client.ServerStreamingSyncBenchmarkClient(
server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT:
@ -172,7 +174,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
load_factor = float(config.client_channels)
if config.load_params.WhichOneof('load') == 'closed_loop':
runner = client_runner.ClosedLoopClientRunner(
client, config.outstanding_rpcs_per_channel)
client, config.outstanding_rpcs_per_channel, no_ping_pong)
else: # Open loop Poisson
alpha = config.load_params.poisson.offered_load / load_factor

Loading…
Cancel
Save