pull/25142/head
Richard Belleville 4 years ago
parent 0be354eb51
commit 56ee26ee6c
  1. 48
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py

@ -60,6 +60,7 @@ PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]]
_CONFIG_CHANGE_TIMEOUT = datetime.timedelta(milliseconds=500) _CONFIG_CHANGE_TIMEOUT = datetime.timedelta(milliseconds=500)
class _StatsWatcher: class _StatsWatcher:
_start: int _start: int
_end: int _end: int
@ -147,15 +148,20 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer
logger.info("Returning stats response: {}".format(response)) logger.info("Returning stats response: {}".format(response))
return response return response
def GetClientAccumulatedStats(self, request: messages_pb2.LoadBalancerAccumulatedStatsRequest, context: grpc.ServicerContext): def GetClientAccumulatedStats(
self, request: messages_pb2.LoadBalancerAccumulatedStatsRequest,
context: grpc.ServicerContext):
logger.info("Received cumulative stats request.") logger.info("Received cumulative stats request.")
response = messages_pb2.LoadBalancerAccumulatedStatsResponse() response = messages_pb2.LoadBalancerAccumulatedStatsResponse()
with _global_lock: with _global_lock:
for method in _SUPPORTED_METHODS: for method in _SUPPORTED_METHODS:
caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method] caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method]
response.num_rpcs_started_by_method[caps_method] = _global_rpcs_started[method] response.num_rpcs_started_by_method[
response.num_rpcs_succeeded_by_method[caps_method] = _global_rpcs_succeeded[method] caps_method] = _global_rpcs_started[method]
response.num_rpcs_failed_by_method[caps_method] = _global_rpcs_failed[method] response.num_rpcs_succeeded_by_method[
caps_method] = _global_rpcs_succeeded[method]
response.num_rpcs_failed_by_method[
caps_method] = _global_rpcs_failed[method]
logger.info("Returning cumulative stats response.") logger.info("Returning cumulative stats response.")
return response return response
@ -231,6 +237,7 @@ def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
for future, _ in futures.values(): for future, _ in futures.values():
future.cancel() future.cancel()
class _ChannelConfiguration: class _ChannelConfiguration:
"""Configuration for a single client channel. """Configuration for a single client channel.
@ -238,6 +245,7 @@ class _ChannelConfiguration:
data member should be accessed directly. This class is not thread-safe. data member should be accessed directly. This class is not thread-safe.
When accessing any of its members, the lock member should be held. When accessing any of its members, the lock member should be held.
""" """
def __init__(self, method: str, metadata: Sequence[Tuple[str, str]], def __init__(self, method: str, metadata: Sequence[Tuple[str, str]],
qps: int, server: str, rpc_timeout_sec: int, qps: int, server: str, rpc_timeout_sec: int,
print_response: bool): print_response: bool):
@ -251,6 +259,7 @@ class _ChannelConfiguration:
self.rpc_timeout_sec = rpc_timeout_sec self.rpc_timeout_sec = rpc_timeout_sec
self.print_response = print_response self.print_response = print_response
def _run_single_channel(config: _ChannelConfiguration): def _run_single_channel(config: _ChannelConfiguration):
global _global_rpc_id # pylint: disable=global-statement global _global_rpc_id # pylint: disable=global-statement
with config.condition: with config.condition:
@ -261,7 +270,8 @@ def _run_single_channel(config: _ChannelConfiguration):
while not _stop_event.is_set(): while not _stop_event.is_set():
with config.condition: with config.condition:
if config.qps == 0: if config.qps == 0:
config.condition.wait(timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds()) config.condition.wait(
timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds())
continue continue
else: else:
duration_per_query = 1.0 / float(config.qps) duration_per_query = 1.0 / float(config.qps)
@ -285,22 +295,27 @@ def _run_single_channel(config: _ChannelConfiguration):
_cancel_all_rpcs(futures) _cancel_all_rpcs(futures)
class _XdsUpdateClientConfigureServicer(test_pb2_grpc.XdsUpdateClientConfigureServiceServicer): class _XdsUpdateClientConfigureServicer(
test_pb2_grpc.XdsUpdateClientConfigureServiceServicer):
def __init__(self, per_method_configs: Mapping[str, _ChannelConfiguration], qps: int): def __init__(self, per_method_configs: Mapping[str, _ChannelConfiguration],
qps: int):
super(_XdsUpdateClientConfigureServicer).__init__() super(_XdsUpdateClientConfigureServicer).__init__()
self._per_method_configs = per_method_configs self._per_method_configs = per_method_configs
self._qps = qps self._qps = qps
def Configure(self, request: messages_pb2.ClientConfigureRequest, def Configure(self, request: messages_pb2.ClientConfigureRequest,
context: grpc.ServicerContext) -> messages_pb2.ClientConfigureResponse: context: grpc.ServicerContext
) -> messages_pb2.ClientConfigureResponse:
logging.info("Received Configure RPC: {}".format(request)) logging.info("Received Configure RPC: {}".format(request))
method_strs = (_METHOD_ENUM_TO_STR[t] for t in request.types) method_strs = (_METHOD_ENUM_TO_STR[t] for t in request.types)
for method in _SUPPORTED_METHODS: for method in _SUPPORTED_METHODS:
method_enum = _METHOD_STR_TO_ENUM[method] method_enum = _METHOD_STR_TO_ENUM[method]
if method in method_strs: if method in method_strs:
qps = self._qps qps = self._qps
metadata = ((md.key, md.value) for md in request.metadata if md.type == method_enum) metadata = ((md.key, md.value)
for md in request.metadata
if md.type == method_enum)
else: else:
qps = 0 qps = 0
metadata = () metadata = ()
@ -318,7 +333,8 @@ class _MethodHandle:
_channel_threads: List[threading.Thread] _channel_threads: List[threading.Thread]
def __init__(self, num_channels: int, channel_config: _ChannelConfiguration): def __init__(self, num_channels: int,
channel_config: _ChannelConfiguration):
"""Creates and starts a group of threads running the indicated method.""" """Creates and starts a group of threads running the indicated method."""
self._channel_threads = [] self._channel_threads = []
for i in range(num_channels): for i in range(num_channels):
@ -344,16 +360,18 @@ def _run(args: argparse.Namespace, methods: Sequence[str],
qps = args.qps qps = args.qps
else: else:
qps = 0 qps = 0
channel_config = _ChannelConfiguration(method, per_method_metadata.get(method, []), channel_config = _ChannelConfiguration(
qps, args.server, args.rpc_timeout_sec, args.print_response) method, per_method_metadata.get(method, []), qps, args.server,
args.rpc_timeout_sec, args.print_response)
channel_configs[method] = channel_config channel_configs[method] = channel_config
method_handles.append( method_handles.append(_MethodHandle(args.num_channels, channel_config))
_MethodHandle(args.num_channels, channel_config))
_global_server = grpc.server(futures.ThreadPoolExecutor()) _global_server = grpc.server(futures.ThreadPoolExecutor())
_global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}")
test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server(
_LoadBalancerStatsServicer(), _global_server) _LoadBalancerStatsServicer(), _global_server)
test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server(_XdsUpdateClientConfigureServicer(channel_configs, args.qps), _global_server) test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server(
_XdsUpdateClientConfigureServicer(channel_configs, args.qps),
_global_server)
_global_server.start() _global_server.start()
_global_server.wait_for_termination() _global_server.wait_for_termination()
for method_handle in method_handles: for method_handle in method_handles:

Loading…
Cancel
Save