diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index c846576765c..35ecf4e6905 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -60,6 +60,7 @@ PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]] _CONFIG_CHANGE_TIMEOUT = datetime.timedelta(milliseconds=500) + class _StatsWatcher: _start: int _end: int @@ -147,15 +148,20 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer logger.info("Returning stats response: {}".format(response)) return response - def GetClientAccumulatedStats(self, request: messages_pb2.LoadBalancerAccumulatedStatsRequest, context: grpc.ServicerContext): + def GetClientAccumulatedStats( + self, request: messages_pb2.LoadBalancerAccumulatedStatsRequest, + context: grpc.ServicerContext): logger.info("Received cumulative stats request.") response = messages_pb2.LoadBalancerAccumulatedStatsResponse() with _global_lock: for method in _SUPPORTED_METHODS: caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method] - response.num_rpcs_started_by_method[caps_method] = _global_rpcs_started[method] - response.num_rpcs_succeeded_by_method[caps_method] = _global_rpcs_succeeded[method] - response.num_rpcs_failed_by_method[caps_method] = _global_rpcs_failed[method] + response.num_rpcs_started_by_method[ + caps_method] = _global_rpcs_started[method] + response.num_rpcs_succeeded_by_method[ + caps_method] = _global_rpcs_succeeded[method] + response.num_rpcs_failed_by_method[ + caps_method] = _global_rpcs_failed[method] logger.info("Returning cumulative stats response.") return response @@ -231,6 +237,7 @@ def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None: for future, _ in futures.values(): future.cancel() + class _ChannelConfiguration: """Configuration for a single client channel. @@ -238,6 +245,7 @@ class _ChannelConfiguration: data member should be accessed directly. This class is not thread-safe. When accessing any of its members, the lock member should be held. """ + def __init__(self, method: str, metadata: Sequence[Tuple[str, str]], qps: int, server: str, rpc_timeout_sec: int, print_response: bool): @@ -251,6 +259,7 @@ class _ChannelConfiguration: self.rpc_timeout_sec = rpc_timeout_sec self.print_response = print_response + def _run_single_channel(config: _ChannelConfiguration): global _global_rpc_id # pylint: disable=global-statement with config.condition: @@ -261,7 +270,8 @@ def _run_single_channel(config: _ChannelConfiguration): while not _stop_event.is_set(): with config.condition: if config.qps == 0: - config.condition.wait(timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds()) + config.condition.wait( + timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds()) continue else: duration_per_query = 1.0 / float(config.qps) @@ -285,22 +295,27 @@ def _run_single_channel(config: _ChannelConfiguration): _cancel_all_rpcs(futures) -class _XdsUpdateClientConfigureServicer(test_pb2_grpc.XdsUpdateClientConfigureServiceServicer): +class _XdsUpdateClientConfigureServicer( + test_pb2_grpc.XdsUpdateClientConfigureServiceServicer): - def __init__(self, per_method_configs: Mapping[str, _ChannelConfiguration], qps: int): + def __init__(self, per_method_configs: Mapping[str, _ChannelConfiguration], + qps: int): super(_XdsUpdateClientConfigureServicer).__init__() self._per_method_configs = per_method_configs self._qps = qps def Configure(self, request: messages_pb2.ClientConfigureRequest, - context: grpc.ServicerContext) -> messages_pb2.ClientConfigureResponse: + context: grpc.ServicerContext + ) -> messages_pb2.ClientConfigureResponse: logging.info("Received Configure RPC: {}".format(request)) method_strs = (_METHOD_ENUM_TO_STR[t] for t in request.types) for method in _SUPPORTED_METHODS: method_enum = _METHOD_STR_TO_ENUM[method] if method in method_strs: qps = self._qps - metadata = ((md.key, md.value) for md in request.metadata if md.type == method_enum) + metadata = ((md.key, md.value) + for md in request.metadata + if md.type == method_enum) else: qps = 0 metadata = () @@ -318,7 +333,8 @@ class _MethodHandle: _channel_threads: List[threading.Thread] - def __init__(self, num_channels: int, channel_config: _ChannelConfiguration): + def __init__(self, num_channels: int, + channel_config: _ChannelConfiguration): """Creates and starts a group of threads running the indicated method.""" self._channel_threads = [] for i in range(num_channels): @@ -338,22 +354,24 @@ def _run(args: argparse.Namespace, methods: Sequence[str], logger.info("Starting python xDS Interop Client.") global _global_server # pylint: disable=global-statement method_handles = [] - channel_configs = {} + channel_configs = {} for method in _SUPPORTED_METHODS: if method in methods: qps = args.qps else: qps = 0 - channel_config = _ChannelConfiguration(method, per_method_metadata.get(method, []), - qps, args.server, args.rpc_timeout_sec, args.print_response) + channel_config = _ChannelConfiguration( + method, per_method_metadata.get(method, []), qps, args.server, + args.rpc_timeout_sec, args.print_response) channel_configs[method] = channel_config - method_handles.append( - _MethodHandle(args.num_channels, channel_config)) + method_handles.append(_MethodHandle(args.num_channels, channel_config)) _global_server = grpc.server(futures.ThreadPoolExecutor()) _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( _LoadBalancerStatsServicer(), _global_server) - test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server(_XdsUpdateClientConfigureServicer(channel_configs, args.qps), _global_server) + test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server( + _XdsUpdateClientConfigureServicer(channel_configs, args.qps), + _global_server) _global_server.start() _global_server.wait_for_termination() for method_handle in method_handles: