Merge pull request #6524 from kpayson64/qps_improvements

Added true async qps streaming client
pull/6534/head
Jan Tattermusch 9 years ago
commit 594ff2054a
  1. 60
      src/python/grpcio/tests/qps/benchmark_client.py
  2. 2
      src/python/grpcio/tests/qps/client_runner.py
  3. 5
      src/python/grpcio/tests/qps/worker_server.py
  4. 49
      tools/run_tests/performance/scenario_config.py

@ -39,6 +39,7 @@ except ImportError:
from concurrent import futures from concurrent import futures
from grpc.beta import implementations from grpc.beta import implementations
from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2 from src.proto.grpc.testing import services_pb2
from tests.unit import resources from tests.unit import resources
@ -141,10 +142,10 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
self._stub = None self._stub = None
class StreamingAsyncBenchmarkClient(BenchmarkClient): class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist): def __init__(self, server, config, hist):
super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
self._is_streaming = False self._is_streaming = False
self._pool = futures.ThreadPoolExecutor(max_workers=1) self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Use a thread-safe queue to put requests on the stream # Use a thread-safe queue to put requests on the stream
@ -167,12 +168,12 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
def _request_stream(self): def _request_stream(self):
self._is_streaming = True self._is_streaming = True
if self._generic: if self._generic:
response_stream = self._stub.inline_stream_stream( stream_callable = self._stub.stream_stream(
'grpc.testing.BenchmarkService', 'StreamingCall', 'grpc.testing.BenchmarkService', 'StreamingCall')
self._request_generator(), _TIMEOUT)
else: else:
response_stream = self._stub.StreamingCall(self._request_generator(), stream_callable = self._stub.StreamingCall
_TIMEOUT)
response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream: for _ in response_stream:
end_time = time.time() end_time = time.time()
self._handle_response(end_time - self._send_time_queue.get_nowait()) self._handle_response(end_time - self._send_time_queue.get_nowait())
@ -184,3 +185,48 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
yield request yield request
except queue.Empty: except queue.Empty:
pass pass
class AsyncReceiver(face.ResponseReceiver):
"""Receiver for async stream responses."""
def __init__(self, send_time_queue, response_handler):
self._send_time_queue = send_time_queue
self._response_handler = response_handler
def initial_metadata(self, initial_mdetadata):
pass
def response(self, response):
end_time = time.time()
self._response_handler(end_time - self._send_time_queue.get_nowait())
def complete(self, terminal_metadata, code, details):
pass
class StreamingAsyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
self._send_time_queue = queue.Queue()
self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
self._rendezvous = None
def send_request(self):
if self._rendezvous is not None:
self._send_time_queue.put(time.time())
self._rendezvous.consume(self._request)
def start(self):
if self._generic:
stream_callable = self._stub.stream_stream(
'grpc.testing.BenchmarkService', 'StreamingCall')
else:
stream_callable = self._stub.StreamingCall
self._rendezvous = stream_callable.event(
self._receiver, lambda *args: None, _TIMEOUT)
def stop(self):
self._rendezvous.terminate()
self._rendezvous = None

@ -89,9 +89,9 @@ class ClosedLoopClientRunner(ClientRunner):
def start(self): def start(self):
self._is_running = True self._is_running = True
self._client.start()
for _ in xrange(self._request_count): for _ in xrange(self._request_count):
self._client.send_request() self._client.send_request()
self._client.start()
def stop(self): def stop(self):
self._is_running = False self._is_running = False

@ -146,8 +146,9 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY: if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnarySyncBenchmarkClient( client = benchmark_client.UnarySyncBenchmarkClient(
server, config, qps_data) server, config, qps_data)
else: elif config.rpc_type == control_pb2.STREAMING:
raise Exception('STREAMING SYNC client not supported') client = benchmark_client.StreamingSyncBenchmarkClient(
server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT: elif config.client_type == control_pb2.ASYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY: if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient( client = benchmark_client.UnaryAsyncBenchmarkClient(

@ -349,39 +349,39 @@ class PythonLanguage:
return 500 return 500
def scenarios(self): def scenarios(self):
# TODO(jtattermusch): this scenario reports QPS 0.0 # TODO(issue #6522): Empty streaming requests does not work for python
yield _ping_pong_scenario(
'python_generic_async_streaming_ping_pong', rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_generic_payload=True,
categories=[SMOKETEST])
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario( #yield _ping_pong_scenario(
# 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', # 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') # client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
# use_generic_payload=True,
# categories=[SMOKETEST])
# TODO(jtattermusch): make this scenario work yield _ping_pong_scenario(
#yield _ping_pong_scenario( 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
# 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
yield _ping_pong_scenario( yield _ping_pong_scenario(
'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY', 'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER', client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
categories=[SMOKETEST]) categories=[SMOKETEST])
# TODO(jtattermusch): make this scenario work # TODO(jtattermusch):
# The qps_worker server gets thread starved with ~6400 threads, the GIL
# enforces that a single thread runs at a time, with no way to set thread
# priority. Re-evaluate after changing DEEP and WIDE.
#yield _ping_pong_scenario( #yield _ping_pong_scenario(
# 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', # 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
# client_type='SYNC_CLIENT', server_type='SYNC_SERVER', # client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
# use_unconstrained_client=True) # use_unconstrained_client=True)
# TODO(jtattermusch): make this scenario work yield _ping_pong_scenario(
#yield _ping_pong_scenario( 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
# 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='SYNC_SERVER',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', use_unconstrained_client=True)
# use_unconstrained_client=True)
yield _ping_pong_scenario( yield _ping_pong_scenario(
'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
@ -389,11 +389,10 @@ class PythonLanguage:
server_language='c++', server_core_limit=1, async_server_threads=1, server_language='c++', server_core_limit=1, async_server_threads=1,
categories=[SMOKETEST]) categories=[SMOKETEST])
# TODO(jtattermusch): make this scenario work yield _ping_pong_scenario(
#yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
# 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
# client_type='SYNC_CLIENT', server_type='SYNC_SERVER', server_language='c++', server_core_limit=1, async_server_threads=1)
# server_language='c++', server_core_limit=1, async_server_threads=1)
def __str__(self): def __str__(self):
return 'python' return 'python'

Loading…
Cancel
Save