Restrict visibility & improve readability

pull/21714/head
Lidi Zheng 5 years ago
parent dc202bdf1f
commit 20a6edfe6e
  1. 2
      setup.cfg
  2. 5
      src/python/grpcio_tests/tests_aio/interop/BUILD.bazel
  3. 6
      src/python/grpcio_tests/tests_aio/interop/client.py
  4. 31
      src/python/grpcio_tests/tests_aio/interop/local_interop_test.py
  5. 105
      src/python/grpcio_tests/tests_aio/interop/methods.py

@ -25,6 +25,6 @@ inputs =
src/python/grpcio_tests/tests_aio src/python/grpcio_tests/tests_aio
# NOTE(lidiz) # NOTE(lidiz)
# import-error: "Can't find module 'grpc._cython.cygrpc'." # import-error: C extension triggers import-error.
# module-attr: pytype cannot understand the namespace packages by Google. # module-attr: pytype cannot understand the namespace packages by Google.
disable = "import-error,module-attr" disable = "import-error,module-attr"

@ -14,10 +14,7 @@
load("@grpc_python_dependencies//:requirements.bzl", "requirement") load("@grpc_python_dependencies//:requirements.bzl", "requirement")
package( package(default_testonly = 1)
default_testonly = 1,
default_visibility = ["//visibility:public"],
)
py_library( py_library(
name = "methods", name = "methods",

@ -23,13 +23,12 @@ from grpc.experimental import aio
from tests.interop import client as interop_client_lib from tests.interop import client as interop_client_lib
from tests_aio.interop import methods from tests_aio.interop import methods
logging.basicConfig(level=logging.DEBUG)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.DEBUG) _LOGGER.setLevel(logging.DEBUG)
def _create_channel(args): def _create_channel(args):
target = '{}:{}'.format(args.server_host, args.server_port) target = f'{args.server_host}:{args.server_port}'
if args.use_tls: if args.use_tls:
channel_credentials, options = interop_client_lib.get_secure_channel_parameters( channel_credentials, options = interop_client_lib.get_secure_channel_parameters(
@ -54,9 +53,10 @@ async def test_interoperability():
channel = _create_channel(args) channel = _create_channel(args)
stub = interop_client_lib.create_stub(channel, args) stub = interop_client_lib.create_stub(channel, args)
test_case = _test_case_from_arg(args.test_case) test_case = _test_case_from_arg(args.test_case)
await test_case.test_interoperability(stub, args) await methods.test_interoperability(test_case, stub, args)
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
asyncio.get_event_loop().set_debug(True) asyncio.get_event_loop().set_debug(True)
asyncio.get_event_loop().run_until_complete(test_interoperability()) asyncio.get_event_loop().run_until_complete(test_interoperability())

@ -37,36 +37,37 @@ class InteropTestCaseMixin:
_stub: test_pb2_grpc.TestServiceStub _stub: test_pb2_grpc.TestServiceStub
async def test_empty_unary(self): async def test_empty_unary(self):
await methods.TestCase.EMPTY_UNARY.test_interoperability( await methods.test_interoperability(methods.TestCase.EMPTY_UNARY,
self._stub, None) self._stub, None)
async def test_large_unary(self): async def test_large_unary(self):
await methods.TestCase.LARGE_UNARY.test_interoperability( await methods.test_interoperability(methods.TestCase.LARGE_UNARY,
self._stub, None) self._stub, None)
async def test_server_streaming(self): async def test_server_streaming(self):
await methods.TestCase.SERVER_STREAMING.test_interoperability( await methods.test_interoperability(methods.TestCase.SERVER_STREAMING,
self._stub, None) self._stub, None)
async def test_client_streaming(self): async def test_client_streaming(self):
await methods.TestCase.CLIENT_STREAMING.test_interoperability( await methods.test_interoperability(methods.TestCase.CLIENT_STREAMING,
self._stub, None) self._stub, None)
async def test_ping_pong(self): async def test_ping_pong(self):
await methods.TestCase.PING_PONG.test_interoperability(self._stub, None) await methods.test_interoperability(methods.TestCase.PING_PONG,
self._stub, None)
async def test_cancel_after_begin(self): async def test_cancel_after_begin(self):
await methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability( await methods.test_interoperability(methods.TestCase.CANCEL_AFTER_BEGIN,
self._stub, None) self._stub, None)
async def test_cancel_after_first_response(self): async def test_cancel_after_first_response(self):
await methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability( await methods.test_interoperability(
self._stub, None) methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE, self._stub, None)
@unittest.skip('TODO(https://github.com/grpc/grpc/issues/21707)') @unittest.skip('TODO(https://github.com/grpc/grpc/issues/21707)')
async def test_timeout_on_sleeping_server(self): async def test_timeout_on_sleeping_server(self):
await methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability( await methods.test_interoperability(
self._stub, None) methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER, self._stub, None)
class InsecureLocalInteropTest(InteropTestCaseMixin, AioTestBase): class InsecureLocalInteropTest(InteropTestCaseMixin, AioTestBase):

@ -13,20 +13,23 @@
# limitations under the License. # limitations under the License.
"""Implementations of interoperability test methods.""" """Implementations of interoperability test methods."""
import enum import argparse
import asyncio import asyncio
from typing import Any, Union, Optional import enum
import collections
import inspect
import json import json
import os import os
import threading import threading
import time import time
from typing import Any, Optional, Union
import grpc import grpc
from grpc.experimental import aio
from google import auth as google_auth from google import auth as google_auth
from google.auth import environment_vars as google_auth_environment_vars from google.auth import environment_vars as google_auth_environment_vars
from google.auth.transport import grpc as google_auth_transport_grpc from google.auth.transport import grpc as google_auth_transport_grpc
from google.auth.transport import requests as google_auth_transport_requests from google.auth.transport import requests as google_auth_transport_requests
from grpc.experimental import aio
from src.proto.grpc.testing import empty_pb2, messages_pb2, test_pb2_grpc from src.proto.grpc.testing import empty_pb2, messages_pb2, test_pb2_grpc
@ -311,14 +314,16 @@ async def _custom_metadata(stub: test_pb2_grpc.TestServiceStub):
await _validate_metadata(call) await _validate_metadata(call)
async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub, args): async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub,
args: argparse.Namespace):
response = await _large_unary_common_behavior(stub, True, True, None) response = await _large_unary_common_behavior(stub, True, True, None)
if args.default_service_account != response.username: if args.default_service_account != response.username:
raise ValueError('expected username %s, got %s' % raise ValueError('expected username %s, got %s' %
(args.default_service_account, response.username)) (args.default_service_account, response.username))
async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, args): async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub,
args: argparse.Namespace):
json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] wanted_email = json.load(open(json_key_filename, 'r'))['client_email']
response = await _large_unary_common_behavior(stub, True, True, None) response = await _large_unary_common_behavior(stub, True, True, None)
@ -331,7 +336,7 @@ async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, args):
response.oauth_scope, args.oauth_scope)) response.oauth_scope, args.oauth_scope))
async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub, unused_args): async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub):
json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] wanted_email = json.load(open(json_key_filename, 'r'))['client_email']
response = await _large_unary_common_behavior(stub, True, False, None) response = await _large_unary_common_behavior(stub, True, False, None)
@ -340,7 +345,8 @@ async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub, unused_args):
(wanted_email, response.username)) (wanted_email, response.username))
async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, args): async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub,
args: argparse.Namespace):
json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] wanted_email = json.load(open(json_key_filename, 'r'))['client_email']
google_credentials, unused_project_id = google_auth.default( google_credentials, unused_project_id = google_auth.default(
@ -356,7 +362,8 @@ async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, args):
(wanted_email, response.username)) (wanted_email, response.username))
async def _special_status_message(stub: test_pb2_grpc.TestServiceStub, args): async def _special_status_message(stub: test_pb2_grpc.TestServiceStub,
args: argparse.Namespace):
details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode( details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode(
'utf-8') 'utf-8')
code = 2 code = 2
@ -381,6 +388,7 @@ class TestCase(enum.Enum):
PING_PONG = 'ping_pong' PING_PONG = 'ping_pong'
CANCEL_AFTER_BEGIN = 'cancel_after_begin' CANCEL_AFTER_BEGIN = 'cancel_after_begin'
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
EMPTY_STREAM = 'empty_stream' EMPTY_STREAM = 'empty_stream'
STATUS_CODE_AND_MESSAGE = 'status_code_and_message' STATUS_CODE_AND_MESSAGE = 'status_code_and_message'
UNIMPLEMENTED_METHOD = 'unimplemented_method' UNIMPLEMENTED_METHOD = 'unimplemented_method'
@ -390,47 +398,46 @@ class TestCase(enum.Enum):
OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
JWT_TOKEN_CREDS = 'jwt_token_creds' JWT_TOKEN_CREDS = 'jwt_token_creds'
PER_RPC_CREDS = 'per_rpc_creds' PER_RPC_CREDS = 'per_rpc_creds'
TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
SPECIAL_STATUS_MESSAGE = 'special_status_message' SPECIAL_STATUS_MESSAGE = 'special_status_message'
async def test_interoperability(self, stub: test_pb2_grpc.TestServiceStub,
args) -> None: _TEST_CASE_IMPLEMENTATION_MAPPING = {
if self is TestCase.EMPTY_UNARY: TestCase.EMPTY_UNARY: _empty_unary,
await _empty_unary(stub) TestCase.LARGE_UNARY: _large_unary,
elif self is TestCase.LARGE_UNARY: TestCase.SERVER_STREAMING: _server_streaming,
await _large_unary(stub) TestCase.CLIENT_STREAMING: _client_streaming,
elif self is TestCase.SERVER_STREAMING: TestCase.PING_PONG: _ping_pong,
await _server_streaming(stub) TestCase.CANCEL_AFTER_BEGIN: _cancel_after_begin,
elif self is TestCase.CLIENT_STREAMING: TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response,
await _client_streaming(stub) TestCase.TIMEOUT_ON_SLEEPING_SERVER: _timeout_on_sleeping_server,
elif self is TestCase.PING_PONG: TestCase.EMPTY_STREAM: _empty_stream,
await _ping_pong(stub) TestCase.STATUS_CODE_AND_MESSAGE: _status_code_and_message,
elif self is TestCase.CANCEL_AFTER_BEGIN: TestCase.UNIMPLEMENTED_METHOD: _unimplemented_method,
await _cancel_after_begin(stub) TestCase.UNIMPLEMENTED_SERVICE: _unimplemented_service,
elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: TestCase.CUSTOM_METADATA: _custom_metadata,
await _cancel_after_first_response(stub) TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds,
elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: TestCase.OAUTH2_AUTH_TOKEN: _oauth2_auth_token,
await _timeout_on_sleeping_server(stub) TestCase.JWT_TOKEN_CREDS: _jwt_token_creds,
elif self is TestCase.EMPTY_STREAM: TestCase.PER_RPC_CREDS: _per_rpc_creds,
await _empty_stream(stub) TestCase.SPECIAL_STATUS_MESSAGE: _special_status_message,
elif self is TestCase.STATUS_CODE_AND_MESSAGE: }
await _status_code_and_message(stub)
elif self is TestCase.UNIMPLEMENTED_METHOD:
await _unimplemented_method(stub) async def test_interoperability(case: TestCase,
elif self is TestCase.UNIMPLEMENTED_SERVICE: stub: test_pb2_grpc.TestServiceStub,
await _unimplemented_service(stub) args: Optional[argparse.Namespace] = None
elif self is TestCase.CUSTOM_METADATA: ) -> None:
await _custom_metadata(stub) method = _TEST_CASE_IMPLEMENTATION_MAPPING.get(case)
elif self is TestCase.COMPUTE_ENGINE_CREDS: if method is None:
await _compute_engine_creds(stub, args) raise NotImplementedError(f'Test case "{case}" not implemented!')
elif self is TestCase.OAUTH2_AUTH_TOKEN: else:
await _oauth2_auth_token(stub, args) num_params = len(inspect.signature(method).parameters)
elif self is TestCase.JWT_TOKEN_CREDS: if num_params == 1:
await _jwt_token_creds(stub, args) await method(stub)
elif self is TestCase.PER_RPC_CREDS: elif num_params == 2:
await _per_rpc_creds(stub, args) if args is not None:
elif self is TestCase.SPECIAL_STATUS_MESSAGE: await method(stub, args)
await _special_status_message(stub, args) else:
raise ValueError(f'Failed to run case [{case}]: args is None')
else: else:
raise NotImplementedError('Test case "%s" not implemented!' % raise ValueError(f'Invalid number of parameters [{num_params}]')
self.name)

Loading…
Cancel
Save