|
|
|
@ -118,6 +118,10 @@ _global_rpcs_started: Mapping[str, int] = collections.defaultdict(int) |
|
|
|
|
_global_rpcs_succeeded: Mapping[str, int] = collections.defaultdict(int) |
|
|
|
|
_global_rpcs_failed: Mapping[str, int] = collections.defaultdict(int) |
|
|
|
|
|
|
|
|
|
# Mapping[method, Mapping[status_code, count]] |
|
|
|
|
_global_rpc_statuses: Mapping[str, Mapping[int, int]] = collections.defaultdict( |
|
|
|
|
lambda: collections.defaultdict(int)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_sigint(sig, frame) -> None: |
|
|
|
|
_stop_event.set() |
|
|
|
@ -164,6 +168,10 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer |
|
|
|
|
caps_method] = _global_rpcs_succeeded[method] |
|
|
|
|
response.num_rpcs_failed_by_method[ |
|
|
|
|
caps_method] = _global_rpcs_failed[method] |
|
|
|
|
response.stats_per_method[ |
|
|
|
|
caps_method].rpcs_started = _global_rpcs_started[method] |
|
|
|
|
for code, count in _global_rpc_statuses[method].items(): |
|
|
|
|
response.stats_per_method[caps_method].result[code] = count |
|
|
|
|
logger.info("Returning cumulative stats response.") |
|
|
|
|
return response |
|
|
|
|
|
|
|
|
@ -190,6 +198,7 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str, |
|
|
|
|
print_response: bool) -> None: |
|
|
|
|
exception = future.exception() |
|
|
|
|
hostname = "" |
|
|
|
|
_global_rpc_statuses[method][future.code().value[0]] += 1 |
|
|
|
|
if exception is not None: |
|
|
|
|
with _global_lock: |
|
|
|
|
_global_rpcs_failed[method] += 1 |
|
|
|
@ -314,18 +323,27 @@ class _XdsUpdateClientConfigureServicer( |
|
|
|
|
method_strs = (_METHOD_ENUM_TO_STR[t] for t in request.types) |
|
|
|
|
for method in _SUPPORTED_METHODS: |
|
|
|
|
method_enum = _METHOD_STR_TO_ENUM[method] |
|
|
|
|
channel_config = self._per_method_configs[method] |
|
|
|
|
if method in method_strs: |
|
|
|
|
qps = self._qps |
|
|
|
|
metadata = ((md.key, md.value) |
|
|
|
|
for md in request.metadata |
|
|
|
|
if md.type == method_enum) |
|
|
|
|
# For backward compatibility, do not change timeout when we |
|
|
|
|
# receive a default value timeout. |
|
|
|
|
if request.timeout_secs == 0: |
|
|
|
|
timeout_sec = channel_config.rpc_timeout_sec |
|
|
|
|
else: |
|
|
|
|
timeout_sec = request.timeout_sec |
|
|
|
|
else: |
|
|
|
|
qps = 0 |
|
|
|
|
metadata = () |
|
|
|
|
channel_config = self._per_method_configs[method] |
|
|
|
|
# Leave timeout unchanged for backward compatibility. |
|
|
|
|
timeout_sec = channel_config.rpc_timeout_sec |
|
|
|
|
with channel_config.condition: |
|
|
|
|
channel_config.qps = qps |
|
|
|
|
channel_config.metadata = list(metadata) |
|
|
|
|
channel_config.rpc_timeout_sec = timeout_sec |
|
|
|
|
channel_config.condition.notify_all() |
|
|
|
|
return messages_pb2.ClientConfigureResponse() |
|
|
|
|
|
|
|
|
|