Add support for unary-stream benchmarking for Python

pull/24576/head
Lidi Zheng 4 years ago
parent e7e1cdb128
commit 36b46b104a
  1. 30
      src/python/grpcio_tests/tests/qps/benchmark_client.py
  2. 2
      src/python/grpcio_tests/tests/qps/qps_worker.py
  3. 3
      src/python/grpcio_tests/tests/qps/worker_server.py
  4. 31
      src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py
  5. 2
      src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py

@ -34,6 +34,8 @@ class GenericStub(object):
def __init__(self, channel):
self.UnaryCall = channel.unary_unary(
'/grpc.testing.BenchmarkService/UnaryCall')
self.StreamingFromServer = channel.unary_stream(
'/grpc.testing.BenchmarkService/StreamingFromServer')
self.StreamingCall = channel.stream_stream(
'/grpc.testing.BenchmarkService/StreamingCall')
@ -200,3 +202,31 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
stream.stop()
self._pool.shutdown(wait=True)
self._stub = None
class ServerStreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
super(ServerStreamingSyncBenchmarkClient,
self).__init__(server, config, hist)
self._pool = futures.ThreadPoolExecutor(
max_workers=config.outstanding_rpcs_per_channel)
self._rpcs = []
def send_request(self):
self._pool.submit(self._one_stream_streamming_rpc)
def _one_stream_streamming_rpc(self):
response_stream = self._stub.StreamingFromServer(
self._request, _TIMEOUT)
self._rpcs.append(response_stream)
start_time = time.time()
for _ in response_stream:
self._handle_response(self, time.time() - start_time)
start_time = time.time()
def stop(self):
for call in self._rpcs:
call.cancel()
self._pool.shutdown(wait=False)
self._stub = None

@ -14,6 +14,7 @@
"""The entry point for the qps worker."""
import argparse
import logging
import time
import grpc
@ -35,6 +36,7 @@ def run_worker_server(driver_port, server_port):
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser(
description='gRPC Python performance testing worker')
parser.add_argument('--driver_port',

@ -151,6 +151,9 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
elif config.rpc_type == control_pb2.STREAMING:
client = benchmark_client.StreamingSyncBenchmarkClient(
server, config, qps_data)
elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
client = benchmark_client.ServerStreamingSyncBenchmarkClient(
server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(

@ -33,6 +33,8 @@ class GenericStub(object):
def __init__(self, channel: aio.Channel):
self.UnaryCall = channel.unary_unary(
'/grpc.testing.BenchmarkService/UnaryCall')
self.StreamingFromServer = channel.unary_stream(
'/grpc.testing.BenchmarkService/StreamingFromServer')
self.StreamingCall = channel.stream_stream(
'/grpc.testing.BenchmarkService/StreamingCall')
@ -153,3 +155,32 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
self._running = False
await self._stopped.wait()
await super().stop()
class ServerStreamingAsyncBenchmarkClient(BenchmarkClient):
def __init__(self, address: str, config: control_pb2.ClientConfig,
hist: histogram.Histogram):
super().__init__(address, config, hist)
self._running = None
self._stopped = asyncio.Event()
async def _one_server_streamming_call(self):
call = self._stub.StreamingFromServer(self._request)
while self._running:
start_time = time.time()
await call.read()
self._record_query_time(time.time() - start_time)
async def run(self):
await super().run()
self._running = True
senders = (self._one_server_streamming_call()
for _ in range(self._concurrency))
await asyncio.gather(*senders)
self._stopped.set()
async def stop(self):
self._running = False
await self._stopped.wait()
await super().stop()

@ -133,6 +133,8 @@ def _create_client(server: str, config: control_pb2.ClientConfig,
client_type = benchmark_client.UnaryAsyncBenchmarkClient
elif config.rpc_type == control_pb2.STREAMING:
client_type = benchmark_client.StreamingAsyncBenchmarkClient
elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
client_type = benchmark_client.ServerStreamingAsyncBenchmarkClient
else:
raise NotImplementedError(
f'Unsupported rpc_type [{config.rpc_type}]')

Loading…
Cancel
Save