From 6e3cc78375131e2b864e6a555104c99b1276cce5 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 18 Aug 2020 23:36:02 +0000 Subject: [PATCH] Abstract out the method being called --- .../interop/xds_interop_client.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index bb6255c9120..4347f25f507 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -116,11 +116,17 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer return response -def _start_rpc(request_id: int, stub: test_pb2_grpc.TestServiceStub, +def _start_rpc(method: str, request_id: int, stub: test_pb2_grpc.TestServiceStub, timeout: float, futures: Mapping[int, grpc.Future]) -> None: logger.info(f"Sending request to backend: {request_id}") - future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), - timeout=timeout) + if method == "UnaryCall": + future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), + timeout=timeout) + elif method == "EmptyCall": + future = stub.EmptyCall.future(empty_pb2.Empty(), + timeout=timeout) + else: + raise ValueError(f"Unrecognized method '{method}'.") futures[request_id] = future @@ -165,7 +171,7 @@ def _cancel_all_rpcs(futures: Mapping[int, grpc.Future]) -> None: future.cancel() -def _run_single_channel(qps: int, server: str, rpc_timeout_sec: int, print_response: bool): +def _run_single_channel(method: str, qps: int, server: str, rpc_timeout_sec: int, print_response: bool): global _global_rpc_id # pylint: disable=global-statement duration_per_query = 1.0 / float(qps) with grpc.insecure_channel(server) as channel: @@ -178,7 +184,7 @@ def _run_single_channel(qps: int, server: str, rpc_timeout_sec: int, print_respo _global_rpc_id += 1 start = time.time() end = start + duration_per_query - _start_rpc(request_id, stub, float(rpc_timeout_sec), futures) + _start_rpc(method, request_id, stub, float(rpc_timeout_sec), futures) _remove_completed_rpcs(futures, print_response) logger.debug(f"Currently {len(futures)} in-flight RPCs") now = time.time() @@ -193,7 +199,7 @@ def _run(args: argparse.Namespace) -> None: global _global_server # pylint: disable=global-statement channel_threads: List[threading.Thread] = [] for i in range(args.num_channels): - thread = threading.Thread(target=_run_single_channel, args=(args.qps, args.server, args.rpc_timeout_sec, args.print_response,)) + thread = threading.Thread(target=_run_single_channel, args=('UnaryCall', args.qps, args.server, args.rpc_timeout_sec, args.print_response,)) thread.start() channel_threads.append(thread) _global_server = grpc.server(futures.ThreadPoolExecutor())