diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index 99c760b5662..c846576765c 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -44,6 +44,11 @@ _SUPPORTED_METHODS = ( "EmptyCall", ) +_METHOD_CAMEL_TO_CAPS_SNAKE = { + "UnaryCall": "UNARY_CALL", + "EmptyCall": "EMPTY_CALL", +} + _METHOD_STR_TO_ENUM = { "UnaryCall": messages_pb2.ClientConfigureRequest.UNARY_CALL, "EmptyCall": messages_pb2.ClientConfigureRequest.EMPTY_CALL, @@ -147,10 +152,11 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer response = messages_pb2.LoadBalancerAccumulatedStatsResponse() with _global_lock: for method in _SUPPORTED_METHODS: - response.num_rpcs_started_by_method[method] = _global_rpcs_started.get[method] - response.num_rpcs_succeeded_by_method[method] = _global_rpcs_succeeded.get[method] - response.num_rpcs_failed_by_method[method] = _global_rpcs_succeeded.get[method] - logger.info("Returning cumulative stats request.") + caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method] + response.num_rpcs_started_by_method[caps_method] = _global_rpcs_started[method] + response.num_rpcs_succeeded_by_method[caps_method] = _global_rpcs_succeeded[method] + response.num_rpcs_failed_by_method[caps_method] = _global_rpcs_failed[method] + logger.info("Returning cumulative stats response.") return response @@ -184,8 +190,6 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str, else: logger.error(exception) else: - with _global_lock: - _global_rpcs_succeeded[method] += 1 response = future.result() hostname = None for metadatum in future.initial_metadata(): @@ -194,6 +198,12 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str, break else: hostname = response.hostname + if future.code() == grpc.StatusCode.OK: + with _global_lock: + _global_rpcs_succeeded[method] += 1 + else: + with _global_lock: + _global_rpcs_failed[method] += 1 if print_response: if future.code() == grpc.StatusCode.OK: logger.info("Successful response.") @@ -284,6 +294,7 @@ class _XdsUpdateClientConfigureServicer(test_pb2_grpc.XdsUpdateClientConfigureSe def Configure(self, request: messages_pb2.ClientConfigureRequest, context: grpc.ServicerContext) -> messages_pb2.ClientConfigureResponse: + logging.info("Received Configure RPC: {}".format(request)) method_strs = (_METHOD_ENUM_TO_STR[t] for t in request.types) for method in _SUPPORTED_METHODS: method_enum = _METHOD_STR_TO_ENUM[method] @@ -293,10 +304,10 @@ class _XdsUpdateClientConfigureServicer(test_pb2_grpc.XdsUpdateClientConfigureSe else: qps = 0 metadata = () - channel_config = self._per_method_config[method] + channel_config = self._per_method_configs[method] with channel_config.condition: channel_config.qps = qps - channel_config.metadata = metadata + channel_config.metadata = list(metadata) channel_config.condition.notify_all() # TODO: Wait for all channels to respond until responding to RPC? return messages_pb2.ClientConfigureResponse()