From efd0521731091f37de5d79ae007aeb7720cc4139 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 19 Aug 2020 20:50:41 +0000 Subject: [PATCH] Format --- .../interop/xds_interop_client.py | 77 ++++++++++++------- tools/run_tests/run_xds_tests.py | 8 +- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index d3c0a3fd0ad..0c4d7ee387b 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -37,10 +37,14 @@ formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) -_SUPPORTED_METHODS = ("UnaryCall", "EmptyCall",) +_SUPPORTED_METHODS = ( + "UnaryCall", + "EmptyCall", +) PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]] + class _StatsWatcher: _start: int _end: int @@ -56,7 +60,8 @@ class _StatsWatcher: self._end = end self._rpcs_needed = end - start self._rpcs_by_peer = collections.defaultdict(int) - self._rpcs_by_method = collections.defaultdict(lambda: collections.defaultdict(int)) + self._rpcs_by_method = collections.defaultdict( + lambda: collections.defaultdict(int)) self._condition = threading.Condition() self._no_remote_peer = 0 @@ -125,8 +130,10 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer return response -def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]], request_id: int, stub: test_pb2_grpc.TestServiceStub, - timeout: float, futures: Mapping[int, Tuple[grpc.Future, str]]) -> None: +def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]], + request_id: int, stub: test_pb2_grpc.TestServiceStub, + timeout: float, + futures: Mapping[int, Tuple[grpc.Future, str]]) -> None: logger.info(f"Sending {method} request to backend: {request_id}") if method == "UnaryCall": future = stub.UnaryCall.future(messages_pb2.SimpleRequest(), @@ -140,8 +147,8 @@ def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]], request_id: int raise ValueError(f"Unrecognized method '{method}'.") futures[request_id] = (future, method) -def _on_rpc_done(rpc_id: int, future: grpc.Future, - method: str, + +def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str, print_response: bool) -> None: exception = future.exception() hostname = "" @@ -152,7 +159,10 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future, logger.error(exception) else: response = future.result() - hostname_metadata = [metadatum for metadatum in future.initial_metadata() if metadatum[0] == "hostname"] + hostname_metadata = [ + metadatum for metadatum in future.initial_metadata() + if metadatum[0] == "hostname" + ] hostname = None if len(hostname_metadata) == 1: hostname = hostname_metadata[0][1] @@ -186,7 +196,9 @@ def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None: future.cancel() -def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], qps: int, server: str, rpc_timeout_sec: int, print_response: bool): +def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], + qps: int, server: str, rpc_timeout_sec: int, + print_response: bool): global _global_rpc_id # pylint: disable=global-statement duration_per_query = 1.0 / float(qps) with grpc.insecure_channel(server) as channel: @@ -199,7 +211,8 @@ def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], qps: i _global_rpc_id += 1 start = time.time() end = start + duration_per_query - _start_rpc(method, metadata, request_id, stub, float(rpc_timeout_sec), futures) + _start_rpc(method, metadata, request_id, stub, + float(rpc_timeout_sec), futures) _remove_completed_rpcs(futures, print_response) logger.debug(f"Currently {len(futures)} in-flight RPCs") now = time.time() @@ -208,16 +221,27 @@ def _run_single_channel(method: str, metadata: Sequence[Tuple[str, str]], qps: i now = time.time() _cancel_all_rpcs(futures) + class _MethodHandle: """An object grouping together threads driving RPCs for a method.""" _channel_threads: List[threading.Thread] - def __init__(self, method: str, metadata: Sequence[Tuple[str, str]], num_channels: int, qps: int, server: str, rpc_timeout_sec: int, print_response: bool): + def __init__(self, method: str, metadata: Sequence[Tuple[str, str]], + num_channels: int, qps: int, server: str, rpc_timeout_sec: int, + print_response: bool): """Creates and starts a group of threads running the indicated method.""" self._channel_threads = [] for i in range(num_channels): - thread = threading.Thread(target=_run_single_channel, args=(method, metadata, qps, server, rpc_timeout_sec, print_response,)) + thread = threading.Thread(target=_run_single_channel, + args=( + method, + metadata, + qps, + server, + rpc_timeout_sec, + print_response, + )) thread.start() self._channel_threads.append(thread) @@ -227,12 +251,16 @@ class _MethodHandle: channel_thread.join() -def _run(args: argparse.Namespace, methods: Sequence[str], per_method_metadata: PerMethodMetadataType) -> None: +def _run(args: argparse.Namespace, methods: Sequence[str], + per_method_metadata: PerMethodMetadataType) -> None: logger.info("Starting python xDS Interop Client.") global _global_server # pylint: disable=global-statement method_handles = [] for method in methods: - method_handles.append(_MethodHandle(method, per_method_metadata.get(method, []), args.num_channels, args.qps, args.server, args.rpc_timeout_sec, args.print_response)) + method_handles.append( + _MethodHandle(method, per_method_metadata.get(method, []), + args.num_channels, args.qps, args.server, + args.rpc_timeout_sec, args.print_response)) _global_server = grpc.server(futures.ThreadPoolExecutor()) _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( @@ -283,17 +311,12 @@ if __name__ == "__main__": rpc_help = "A comma-delimited list of RPC methods to run. Must be one of " rpc_help += ", ".join(_SUPPORTED_METHODS) rpc_help += "." - parser.add_argument("--rpc", - default="UnaryCall", - type=str, - help=rpc_help) - metadata_help = ("A comma-delimited list of 3-tuples of the form " + - "METHOD:KEY:VALUE, e.g. " + - "EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3") - parser.add_argument("--metadata", - default="", - type=str, - help=metadata_help) + parser.add_argument("--rpc", default="UnaryCall", type=str, help=rpc_help) + metadata_help = ( + "A comma-delimited list of 3-tuples of the form " + + "METHOD:KEY:VALUE, e.g. " + + "EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3") + parser.add_argument("--metadata", default="", type=str, help=metadata_help) args = parser.parse_args() signal.signal(signal.SIGINT, _handle_sigint) if args.verbose: @@ -304,13 +327,15 @@ if __name__ == "__main__": logger.addHandler(file_handler) methods = args.rpc.split(",") if set(methods) - set(_SUPPORTED_METHODS): - raise ValueError("--rpc supported methods: {}".format(", ".join(_SUPPORTED_METHODS))) + raise ValueError("--rpc supported methods: {}".format( + ", ".join(_SUPPORTED_METHODS))) per_method_metadata = collections.defaultdict(list) metadata = args.metadata.split(",") if args.metadata else [] for metadatum in metadata: elems = metadatum.split(":") if len(elems) != 3: - raise ValueError(f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'") + raise ValueError( + f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'") if elems[0] not in _SUPPORTED_METHODS: raise ValueError(f"Unrecognized method '{elems[0]}'") per_method_metadata[elems[0]].append((elems[1], elems[2])) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 10a9e4ec133..d11a84cb400 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -1803,6 +1803,7 @@ try: stderr=subprocess.STDOUT, stdout=test_log_file) client_logged = threading.Event() + def _log_client(client_process, client_logged): # NOTE(rbellevi): Runs on another thread and logs the # client's output as soon as it terminates. This enables @@ -1826,7 +1827,12 @@ try: logger.info(client_output.read()) client_logged.set() - logging_thread = threading.Thread(target=_log_client, args=(client_process, client_logged,), daemon=True) + logging_thread = threading.Thread(target=_log_client, + args=( + client_process, + client_logged, + ), + daemon=True) logging_thread.start() if test_case == 'backends_restart': test_backends_restart(gcp, backend_service, instance_group)