diff --git a/setup.cfg b/setup.cfg index 58aeee79525..d54017af909 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,6 +25,6 @@ inputs = src/python/grpcio_tests/tests_aio # NOTE(lidiz) -# import-error: "Can't find module 'grpc._cython.cygrpc'." +# import-error: C extension triggers import-error. # module-attr: pytype cannot understand the namespace packages by Google. disable = "import-error,module-attr" diff --git a/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel b/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel index 8dfa744805d..b5bbdc6df4e 100644 --- a/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel @@ -14,10 +14,7 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement") -package( - default_testonly = 1, - default_visibility = ["//visibility:public"], -) +package(default_testonly = 1) py_library( name = "methods", diff --git a/src/python/grpcio_tests/tests_aio/interop/client.py b/src/python/grpcio_tests/tests_aio/interop/client.py index 3ad548d5210..905effa45bf 100644 --- a/src/python/grpcio_tests/tests_aio/interop/client.py +++ b/src/python/grpcio_tests/tests_aio/interop/client.py @@ -23,13 +23,12 @@ from grpc.experimental import aio from tests.interop import client as interop_client_lib from tests_aio.interop import methods -logging.basicConfig(level=logging.DEBUG) _LOGGER = logging.getLogger(__name__) _LOGGER.setLevel(logging.DEBUG) def _create_channel(args): - target = '{}:{}'.format(args.server_host, args.server_port) + target = f'{args.server_host}:{args.server_port}' if args.use_tls: channel_credentials, options = interop_client_lib.get_secure_channel_parameters( @@ -54,9 +53,10 @@ async def test_interoperability(): channel = _create_channel(args) stub = interop_client_lib.create_stub(channel, args) test_case = _test_case_from_arg(args.test_case) - await test_case.test_interoperability(stub, args) + await methods.test_interoperability(test_case, stub, args) if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) asyncio.get_event_loop().set_debug(True) asyncio.get_event_loop().run_until_complete(test_interoperability()) diff --git a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py index e19fdde92b0..d838b7beda6 100644 --- a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py +++ b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py @@ -37,36 +37,37 @@ class InteropTestCaseMixin: _stub: test_pb2_grpc.TestServiceStub async def test_empty_unary(self): - await methods.TestCase.EMPTY_UNARY.test_interoperability( - self._stub, None) + await methods.test_interoperability(methods.TestCase.EMPTY_UNARY, + self._stub, None) async def test_large_unary(self): - await methods.TestCase.LARGE_UNARY.test_interoperability( - self._stub, None) + await methods.test_interoperability(methods.TestCase.LARGE_UNARY, + self._stub, None) async def test_server_streaming(self): - await methods.TestCase.SERVER_STREAMING.test_interoperability( - self._stub, None) + await methods.test_interoperability(methods.TestCase.SERVER_STREAMING, + self._stub, None) async def test_client_streaming(self): - await methods.TestCase.CLIENT_STREAMING.test_interoperability( - self._stub, None) + await methods.test_interoperability(methods.TestCase.CLIENT_STREAMING, + self._stub, None) async def test_ping_pong(self): - await methods.TestCase.PING_PONG.test_interoperability(self._stub, None) + await methods.test_interoperability(methods.TestCase.PING_PONG, + self._stub, None) async def test_cancel_after_begin(self): - await methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability( - self._stub, None) + await methods.test_interoperability(methods.TestCase.CANCEL_AFTER_BEGIN, + self._stub, None) async def test_cancel_after_first_response(self): - await methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability( - self._stub, None) + await methods.test_interoperability( + methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE, self._stub, None) @unittest.skip('TODO(https://github.com/grpc/grpc/issues/21707)') async def test_timeout_on_sleeping_server(self): - await methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability( - self._stub, None) + await methods.test_interoperability( + methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER, self._stub, None) class InsecureLocalInteropTest(InteropTestCaseMixin, AioTestBase): diff --git a/src/python/grpcio_tests/tests_aio/interop/methods.py b/src/python/grpcio_tests/tests_aio/interop/methods.py index c8697a4d326..8a5875fdd32 100644 --- a/src/python/grpcio_tests/tests_aio/interop/methods.py +++ b/src/python/grpcio_tests/tests_aio/interop/methods.py @@ -13,20 +13,23 @@ # limitations under the License. """Implementations of interoperability test methods.""" -import enum +import argparse import asyncio -from typing import Any, Union, Optional +import enum +import collections +import inspect import json import os import threading import time +from typing import Any, Optional, Union import grpc -from grpc.experimental import aio from google import auth as google_auth from google.auth import environment_vars as google_auth_environment_vars from google.auth.transport import grpc as google_auth_transport_grpc from google.auth.transport import requests as google_auth_transport_requests +from grpc.experimental import aio from src.proto.grpc.testing import empty_pb2, messages_pb2, test_pb2_grpc @@ -311,14 +314,16 @@ async def _custom_metadata(stub: test_pb2_grpc.TestServiceStub): await _validate_metadata(call) -async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub, args): +async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub, + args: argparse.Namespace): response = await _large_unary_common_behavior(stub, True, True, None) if args.default_service_account != response.username: raise ValueError('expected username %s, got %s' % (args.default_service_account, response.username)) -async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, args): +async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, + args: argparse.Namespace): json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] response = await _large_unary_common_behavior(stub, True, True, None) @@ -331,7 +336,7 @@ async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, args): response.oauth_scope, args.oauth_scope)) -async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub, unused_args): +async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub): json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] response = await _large_unary_common_behavior(stub, True, False, None) @@ -340,7 +345,8 @@ async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub, unused_args): (wanted_email, response.username)) -async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, args): +async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, + args: argparse.Namespace): json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] google_credentials, unused_project_id = google_auth.default( @@ -356,7 +362,8 @@ async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, args): (wanted_email, response.username)) -async def _special_status_message(stub: test_pb2_grpc.TestServiceStub, args): +async def _special_status_message(stub: test_pb2_grpc.TestServiceStub, + args: argparse.Namespace): details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode( 'utf-8') code = 2 @@ -381,6 +388,7 @@ class TestCase(enum.Enum): PING_PONG = 'ping_pong' CANCEL_AFTER_BEGIN = 'cancel_after_begin' CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' + TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' EMPTY_STREAM = 'empty_stream' STATUS_CODE_AND_MESSAGE = 'status_code_and_message' UNIMPLEMENTED_METHOD = 'unimplemented_method' @@ -390,47 +398,46 @@ class TestCase(enum.Enum): OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' JWT_TOKEN_CREDS = 'jwt_token_creds' PER_RPC_CREDS = 'per_rpc_creds' - TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' SPECIAL_STATUS_MESSAGE = 'special_status_message' - async def test_interoperability(self, stub: test_pb2_grpc.TestServiceStub, - args) -> None: - if self is TestCase.EMPTY_UNARY: - await _empty_unary(stub) - elif self is TestCase.LARGE_UNARY: - await _large_unary(stub) - elif self is TestCase.SERVER_STREAMING: - await _server_streaming(stub) - elif self is TestCase.CLIENT_STREAMING: - await _client_streaming(stub) - elif self is TestCase.PING_PONG: - await _ping_pong(stub) - elif self is TestCase.CANCEL_AFTER_BEGIN: - await _cancel_after_begin(stub) - elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: - await _cancel_after_first_response(stub) - elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: - await _timeout_on_sleeping_server(stub) - elif self is TestCase.EMPTY_STREAM: - await _empty_stream(stub) - elif self is TestCase.STATUS_CODE_AND_MESSAGE: - await _status_code_and_message(stub) - elif self is TestCase.UNIMPLEMENTED_METHOD: - await _unimplemented_method(stub) - elif self is TestCase.UNIMPLEMENTED_SERVICE: - await _unimplemented_service(stub) - elif self is TestCase.CUSTOM_METADATA: - await _custom_metadata(stub) - elif self is TestCase.COMPUTE_ENGINE_CREDS: - await _compute_engine_creds(stub, args) - elif self is TestCase.OAUTH2_AUTH_TOKEN: - await _oauth2_auth_token(stub, args) - elif self is TestCase.JWT_TOKEN_CREDS: - await _jwt_token_creds(stub, args) - elif self is TestCase.PER_RPC_CREDS: - await _per_rpc_creds(stub, args) - elif self is TestCase.SPECIAL_STATUS_MESSAGE: - await _special_status_message(stub, args) + +_TEST_CASE_IMPLEMENTATION_MAPPING = { + TestCase.EMPTY_UNARY: _empty_unary, + TestCase.LARGE_UNARY: _large_unary, + TestCase.SERVER_STREAMING: _server_streaming, + TestCase.CLIENT_STREAMING: _client_streaming, + TestCase.PING_PONG: _ping_pong, + TestCase.CANCEL_AFTER_BEGIN: _cancel_after_begin, + TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response, + TestCase.TIMEOUT_ON_SLEEPING_SERVER: _timeout_on_sleeping_server, + TestCase.EMPTY_STREAM: _empty_stream, + TestCase.STATUS_CODE_AND_MESSAGE: _status_code_and_message, + TestCase.UNIMPLEMENTED_METHOD: _unimplemented_method, + TestCase.UNIMPLEMENTED_SERVICE: _unimplemented_service, + TestCase.CUSTOM_METADATA: _custom_metadata, + TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds, + TestCase.OAUTH2_AUTH_TOKEN: _oauth2_auth_token, + TestCase.JWT_TOKEN_CREDS: _jwt_token_creds, + TestCase.PER_RPC_CREDS: _per_rpc_creds, + TestCase.SPECIAL_STATUS_MESSAGE: _special_status_message, +} + + +async def test_interoperability(case: TestCase, + stub: test_pb2_grpc.TestServiceStub, + args: Optional[argparse.Namespace] = None + ) -> None: + method = _TEST_CASE_IMPLEMENTATION_MAPPING.get(case) + if method is None: + raise NotImplementedError(f'Test case "{case}" not implemented!') + else: + num_params = len(inspect.signature(method).parameters) + if num_params == 1: + await method(stub) + elif num_params == 2: + if args is not None: + await method(stub, args) + else: + raise ValueError(f'Failed to run case [{case}]: args is None') else: - raise NotImplementedError('Test case "%s" not implemented!' % - self.name) + raise ValueError(f'Invalid number of parameters [{num_params}]')