From 057b34a4d056f40f7023274602783be5540b01c7 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 19 Aug 2020 00:20:43 +0000 Subject: [PATCH] Pull out function for running single method --- .../interop/xds_interop_client.py | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index 4347f25f507..b671302cc04 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -193,23 +193,39 @@ def _run_single_channel(method: str, qps: int, server: str, rpc_timeout_sec: int now = time.time() _cancel_all_rpcs(futures) +class _MethodHandle: + """An object grouping together threads driving RPCs for a method.""" + + _channel_threads: List[threading.Thread] + + def __init__(self, method: str, num_channels: int, qps: int, server: str, rpc_timeout_sec: int, print_response: bool): + """Creates and starts a group of threads running the indicated method.""" + self._channel_threads = [] + for i in range(num_channels): + thread = threading.Thread(target=_run_single_channel, args=(method, qps, server, rpc_timeout_sec, print_response,)) + thread.start() + self._channel_threads.append(thread) + + def stop(self): + """Joins all threads referenced by the handle.""" + for channel_thread in self._channel_threads: + channel_thread.join() + def _run(args: argparse.Namespace) -> None: logger.info("Starting python xDS Interop Client.") global _global_server # pylint: disable=global-statement - channel_threads: List[threading.Thread] = [] - for i in range(args.num_channels): - thread = threading.Thread(target=_run_single_channel, args=('UnaryCall', args.qps, args.server, args.rpc_timeout_sec, args.print_response,)) - thread.start() - channel_threads.append(thread) + method_handles = [] + for method in ("UnaryCall",): + method_handles.append(_MethodHandle(method, args.num_channels, args.qps, args.server, args.rpc_timeout_sec, args.print_response)) _global_server = grpc.server(futures.ThreadPoolExecutor()) _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( _LoadBalancerStatsServicer(), _global_server) _global_server.start() _global_server.wait_for_termination() - for i in range(args.num_channels): - thread.join() + for method_handle in method_handles: + method_handle.stop() if __name__ == "__main__":