diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index b372ea01ade..aac218ed819 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -185,48 +185,3 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): yield request except queue.Empty: pass - - -class AsyncReceiver(face.ResponseReceiver): - """Receiver for async stream responses.""" - - def __init__(self, send_time_queue, response_handler): - self._send_time_queue = send_time_queue - self._response_handler = response_handler - - def initial_metadata(self, initial_mdetadata): - pass - - def response(self, response): - end_time = time.time() - self._response_handler(end_time - self._send_time_queue.get_nowait()) - - def complete(self, terminal_metadata, code, details): - pass - - -class StreamingAsyncBenchmarkClient(BenchmarkClient): - - def __init__(self, server, config, hist): - super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) - self._send_time_queue = queue.Queue() - self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) - self._rendezvous = None - - def send_request(self): - if self._rendezvous is not None: - self._send_time_queue.put(time.time()) - self._rendezvous.consume(self._request) - - def start(self): - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - self._rendezvous = stream_callable.event( - self._receiver, lambda *args: None, _TIMEOUT) - - def stop(self): - self._rendezvous.terminate() - self._rendezvous = None diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py index 3dda718638e..16926379a5b 100644 --- a/src/python/grpcio/tests/qps/qps_worker.py +++ b/src/python/grpcio/tests/qps/qps_worker.py @@ -43,9 +43,7 @@ def run_worker_server(port): server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - # Drain outstanding requests for clean exit - time.sleep(2) - server.stop(0) + server.stop(2) if __name__ == '__main__': diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py index 1f9af5482cc..d41f8377c2a 100644 --- a/src/python/grpcio/tests/qps/worker_server.py +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnaryAsyncBenchmarkClient( server, config, qps_data) - elif config.rpc_type == control_pb2.STREAMING: - client = benchmark_client.StreamingAsyncBenchmarkClient( - server, config, qps_data) + else: + raise Exception('Async streaming client not supported') else: raise Exception('Unsupported client type {}'.format(config.client_type)) diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index b55d728d840..81569e8b7a4 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -395,8 +395,8 @@ class PythonLanguage: # categories=[SMOKETEST]) yield _ping_pong_scenario( - 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER') + 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER') yield _ping_pong_scenario( 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', @@ -413,8 +413,8 @@ class PythonLanguage: unconstrained_client='sync') yield _ping_pong_scenario( - 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER', + 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', unconstrained_client='async') yield _ping_pong_scenario(