diff --git a/setup.cfg b/setup.cfg index 87a9e6bad2a..58aeee79525 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,5 +24,7 @@ inputs = src/python/grpcio/grpc/experimental src/python/grpcio_tests/tests_aio -disable = - import-error +# NOTE(lidiz) +# import-error: "Can't find module 'grpc._cython.cygrpc'." +# module-attr: pytype cannot understand the namespace packages by Google. +disable = "import-error,module-attr" diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 440476c333b..e7378d6d5d5 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -235,12 +235,15 @@ class RunInterop(test.test): description = 'run interop test client/server' user_options = [('args=', 'a', 'pass-thru arguments for the client/server'), ('client', 'c', 'flag indicating to run the client'), - ('server', 's', 'flag indicating to run the server')] + ('server', 's', 'flag indicating to run the server'), + ('asyncio', 'i', 'flag indicating to run the asyncio stack') + ] def initialize_options(self): self.args = '' self.client = False self.server = False + self.asyncio = False def finalize_options(self): if self.client and self.server: @@ -261,9 +264,15 @@ class RunInterop(test.test): def run_server(self): # We import here to ensure that our setuptools parent has had a chance to # edit the Python system path. - from tests.interop import server - sys.argv[1:] = self.args.split() - server.serve() + if self.asyncio: + import asyncio + from tests_aio.interop import server + sys.argv[1:] = self.args.split() + asyncio.get_event_loop().run_until_complete(server.serve()) + else: + from tests.interop import server + sys.argv[1:] = self.args.split() + server.serve() def run_client(self): # We import here to ensure that our setuptools parent has had a chance to diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 2d490ff333d..71b2fecef18 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -25,7 +25,7 @@ from tests.interop import methods from tests.interop import resources -def _args(): +def parse_interop_client_args(): parser = argparse.ArgumentParser() parser.add_argument('--server_host', default="localhost", @@ -59,49 +59,63 @@ def _args(): return parser.parse_args() -def _stub(args): - target = '{}:{}'.format(args.server_host, args.server_port) +def _create_call_credentials(args): if args.test_case == 'oauth2_auth_token': google_credentials, unused_project_id = google_auth.default( scopes=[args.oauth_scope]) google_credentials.refresh(google_auth.transport.requests.Request()) - call_credentials = grpc.access_token_call_credentials( - google_credentials.token) + return grpc.access_token_call_credentials(google_credentials.token) elif args.test_case == 'compute_engine_creds': google_credentials, unused_project_id = google_auth.default( scopes=[args.oauth_scope]) - call_credentials = grpc.metadata_call_credentials( + return grpc.metadata_call_credentials( google_auth.transport.grpc.AuthMetadataPlugin( credentials=google_credentials, request=google_auth.transport.requests.Request())) elif args.test_case == 'jwt_token_creds': google_credentials = google_auth_jwt.OnDemandCredentials.from_service_account_file( os.environ[google_auth.environment_vars.CREDENTIALS]) - call_credentials = grpc.metadata_call_credentials( + return grpc.metadata_call_credentials( google_auth.transport.grpc.AuthMetadataPlugin( credentials=google_credentials, request=None)) else: - call_credentials = None + return None + + +def get_secure_channel_parameters(args): + call_credentials = _create_call_credentials(args) + + if args.use_test_ca: + root_certificates = resources.test_root_certificates() + else: + root_certificates = None # will load default roots. + + channel_credentials = grpc.ssl_channel_credentials(root_certificates) + if call_credentials is not None: + channel_credentials = grpc.composite_channel_credentials( + channel_credentials, call_credentials) + + channel_opts = None + if args.server_host_override: + channel_opts = (( + 'grpc.ssl_target_name_override', + args.server_host_override, + ),) + + return channel_credentials, channel_opts + + +def _create_channel(args): + target = '{}:{}'.format(args.server_host, args.server_port) + if args.use_tls: - if args.use_test_ca: - root_certificates = resources.test_root_certificates() - else: - root_certificates = None # will load default roots. - - channel_credentials = grpc.ssl_channel_credentials(root_certificates) - if call_credentials is not None: - channel_credentials = grpc.composite_channel_credentials( - channel_credentials, call_credentials) - - channel_opts = None - if args.server_host_override: - channel_opts = (( - 'grpc.ssl_target_name_override', - args.server_host_override, - ),) - channel = grpc.secure_channel(target, channel_credentials, channel_opts) + channel_credentials, options = get_secure_channel_parameters(args) + return grpc.secure_channel(target, channel_credentials, options) else: - channel = grpc.insecure_channel(target) + return grpc.insecure_channel(target) + + +def create_stub(channel, args): if args.test_case == "unimplemented_service": return test_pb2_grpc.UnimplementedServiceStub(channel) else: @@ -117,8 +131,9 @@ def _test_case_from_arg(test_case_arg): def test_interoperability(): - args = _args() - stub = _stub(args) + args = parse_interop_client_args() + channel = _create_channel(args) + stub = create_stub(channel, args) test_case = _test_case_from_arg(args.test_case) test_case.test_interoperability(stub, args) diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index 15a4cd1d599..0a0061f5b1c 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -28,7 +28,7 @@ logging.basicConfig() _LOGGER = logging.getLogger(__name__) -def serve(): +def parse_interop_server_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--port', type=int, @@ -38,16 +38,23 @@ def serve(): default=False, type=resources.parse_bool, help='require a secure connection') - args = parser.parse_args() + return parser.parse_args() + + +def get_server_credentials(): + private_key = resources.private_key() + certificate_chain = resources.certificate_chain() + return grpc.ssl_server_credentials(((private_key, certificate_chain),)) + + +def serve(): + args = parse_interop_server_arguments() server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(service.TestService(), server) if args.use_tls: - private_key = resources.private_key() - certificate_chain = resources.certificate_chain() - credentials = grpc.ssl_server_credentials( - ((private_key, certificate_chain),)) + credentials = get_server_credentials() server.add_secure_port('[::]:{}'.format(args.port), credentials) else: server.add_insecure_port('[::]:{}'.format(args.port)) diff --git a/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel b/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel new file mode 100644 index 00000000000..8dfa744805d --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/interop/BUILD.bazel @@ -0,0 +1,77 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("@grpc_python_dependencies//:requirements.bzl", "requirement") + +package( + default_testonly = 1, + default_visibility = ["//visibility:public"], +) + +py_library( + name = "methods", + srcs = ["methods.py"], + imports = ["../../"], + deps = [ + "//src/proto/grpc/testing:empty_py_pb2", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:py_test_proto", + "//src/proto/grpc/testing:test_py_pb2_grpc", + "//src/python/grpcio/grpc:grpcio", + requirement("google-auth"), + requirement("requests"), + requirement("urllib3"), + requirement("chardet"), + requirement("certifi"), + requirement("idna"), + ], +) + +py_test( + name = "local_interop_test", + size = "small", + srcs = ["local_interop_test.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + ":methods", + "//src/python/grpcio_tests/tests/interop:resources", + "//src/python/grpcio_tests/tests_aio/unit:_test_base", + "//src/python/grpcio_tests/tests_aio/unit:_test_server", + ], +) + +py_binary( + name = "server", + srcs = ["server.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_tests/tests/interop:server", + "//src/python/grpcio_tests/tests_aio/unit:_test_server", + ], +) + +py_binary( + name = "client", + srcs = ["client.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + ":methods", + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_tests/tests/interop:client", + ], +) diff --git a/src/python/grpcio_tests/tests_aio/interop/__init__.py b/src/python/grpcio_tests/tests_aio/interop/__init__.py new file mode 100644 index 00000000000..b71ddbd314c --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/interop/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/interop/client.py b/src/python/grpcio_tests/tests_aio/interop/client.py new file mode 100644 index 00000000000..3ad548d5210 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/interop/client.py @@ -0,0 +1,62 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import logging +import os + +import grpc +from grpc.experimental import aio + +from tests.interop import client as interop_client_lib +from tests_aio.interop import methods + +logging.basicConfig(level=logging.DEBUG) +_LOGGER = logging.getLogger(__name__) +_LOGGER.setLevel(logging.DEBUG) + + +def _create_channel(args): + target = '{}:{}'.format(args.server_host, args.server_port) + + if args.use_tls: + channel_credentials, options = interop_client_lib.get_secure_channel_parameters( + args) + return aio.secure_channel(target, channel_credentials, options) + else: + return aio.insecure_channel(target) + + +def _test_case_from_arg(test_case_arg): + for test_case in methods.TestCase: + if test_case_arg == test_case.value: + return test_case + else: + raise ValueError('No test case "%s"!' % test_case_arg) + + +async def test_interoperability(): + aio.init_grpc_aio() + + args = interop_client_lib.parse_interop_client_args() + channel = _create_channel(args) + stub = interop_client_lib.create_stub(channel, args) + test_case = _test_case_from_arg(args.test_case) + await test_case.test_interoperability(stub, args) + + +if __name__ == '__main__': + asyncio.get_event_loop().set_debug(True) + asyncio.get_event_loop().run_until_complete(test_interoperability()) diff --git a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py new file mode 100644 index 00000000000..e19fdde92b0 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py @@ -0,0 +1,86 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Conducts interop tests locally.""" + +import logging +import unittest + +import grpc +from grpc.experimental import aio + +from src.proto.grpc.testing import test_pb2_grpc +from tests.interop import resources +from tests_aio.interop import methods +from tests_aio.unit._test_base import AioTestBase +from tests_aio.unit._test_server import start_test_server + +_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' + + +class InteropTestCaseMixin: + """Unit test methods. + + This class must be mixed in with unittest.TestCase and a class that defines + setUp and tearDown methods that manage a stub attribute. + """ + _stub: test_pb2_grpc.TestServiceStub + + async def test_empty_unary(self): + await methods.TestCase.EMPTY_UNARY.test_interoperability( + self._stub, None) + + async def test_large_unary(self): + await methods.TestCase.LARGE_UNARY.test_interoperability( + self._stub, None) + + async def test_server_streaming(self): + await methods.TestCase.SERVER_STREAMING.test_interoperability( + self._stub, None) + + async def test_client_streaming(self): + await methods.TestCase.CLIENT_STREAMING.test_interoperability( + self._stub, None) + + async def test_ping_pong(self): + await methods.TestCase.PING_PONG.test_interoperability(self._stub, None) + + async def test_cancel_after_begin(self): + await methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability( + self._stub, None) + + async def test_cancel_after_first_response(self): + await methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability( + self._stub, None) + + @unittest.skip('TODO(https://github.com/grpc/grpc/issues/21707)') + async def test_timeout_on_sleeping_server(self): + await methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability( + self._stub, None) + + +class InsecureLocalInteropTest(InteropTestCaseMixin, AioTestBase): + + async def setUp(self): + address, self._server = await start_test_server() + self._channel = aio.insecure_channel(address) + self._stub = test_pb2_grpc.TestServiceStub(self._channel) + + async def tearDown(self): + await self._channel.close() + await self._server.stop(None) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/interop/methods.py b/src/python/grpcio_tests/tests_aio/interop/methods.py new file mode 100644 index 00000000000..c8697a4d326 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/interop/methods.py @@ -0,0 +1,436 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementations of interoperability test methods.""" + +import enum +import asyncio +from typing import Any, Union, Optional +import json +import os +import threading +import time + +import grpc +from grpc.experimental import aio +from google import auth as google_auth +from google.auth import environment_vars as google_auth_environment_vars +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests + +from src.proto.grpc.testing import empty_pb2, messages_pb2, test_pb2_grpc + +_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" +_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" + + +async def _expect_status_code(call: aio.Call, + expected_code: grpc.StatusCode) -> None: + code = await call.code() + if code != expected_code: + raise ValueError('expected code %s, got %s' % + (expected_code, call.code())) + + +async def _expect_status_details(call: aio.Call, expected_details: str) -> None: + details = await call.details() + if details != expected_details: + raise ValueError('expected message %s, got %s' % + (expected_details, call.details())) + + +async def _validate_status_code_and_details(call: aio.Call, + expected_code: grpc.StatusCode, + expected_details: str) -> None: + await _expect_status_code(call, expected_code) + await _expect_status_details(call, expected_details) + + +def _validate_payload_type_and_length( + response: Union[messages_pb2.SimpleResponse, messages_pb2. + StreamingOutputCallResponse], expected_type: Any, + expected_length: int) -> None: + if response.payload.type is not expected_type: + raise ValueError('expected payload type %s, got %s' % + (expected_type, type(response.payload.type))) + elif len(response.payload.body) != expected_length: + raise ValueError('expected payload body size %d, got %d' % + (expected_length, len(response.payload.body))) + + +async def _large_unary_common_behavior( + stub: test_pb2_grpc.TestServiceStub, fill_username: bool, + fill_oauth_scope: bool, call_credentials: Optional[grpc.CallCredentials] +) -> messages_pb2.SimpleResponse: + size = 314159 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=size, + payload=messages_pb2.Payload(body=b'\x00' * 271828), + fill_username=fill_username, + fill_oauth_scope=fill_oauth_scope) + response = await stub.UnaryCall(request, credentials=call_credentials) + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) + return response + + +async def _empty_unary(stub: test_pb2_grpc.TestServiceStub) -> None: + response = await stub.EmptyCall(empty_pb2.Empty()) + if not isinstance(response, empty_pb2.Empty): + raise TypeError('response is of type "%s", not empty_pb2.Empty!' % + type(response)) + + +async def _large_unary(stub: test_pb2_grpc.TestServiceStub) -> None: + await _large_unary_common_behavior(stub, False, False, None) + + +async def _client_streaming(stub: test_pb2_grpc.TestServiceStub) -> None: + payload_body_sizes = ( + 27182, + 8, + 1828, + 45904, + ) + + async def request_gen(): + for size in payload_body_sizes: + yield messages_pb2.StreamingInputCallRequest( + payload=messages_pb2.Payload(body=b'\x00' * size)) + + response = await stub.StreamingInputCall(request_gen()) + if response.aggregated_payload_size != sum(payload_body_sizes): + raise ValueError('incorrect size %d!' % + response.aggregated_payload_size) + + +async def _server_streaming(stub: test_pb2_grpc.TestServiceStub) -> None: + sizes = ( + 31415, + 9, + 2653, + 58979, + ) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=sizes[0]), + messages_pb2.ResponseParameters(size=sizes[1]), + messages_pb2.ResponseParameters(size=sizes[2]), + messages_pb2.ResponseParameters(size=sizes[3]), + )) + call = stub.StreamingOutputCall(request) + for size in sizes: + response = await call.read() + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, + size) + + +async def _ping_pong(stub: test_pb2_grpc.TestServiceStub) -> None: + request_response_sizes = ( + 31415, + 9, + 2653, + 58979, + ) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904, + ) + + call = stub.FullDuplexCall() + for response_size, payload_size in zip(request_response_sizes, + request_payload_sizes): + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters( + size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + + await call.write(request) + response = await call.read() + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, + response_size) + + +async def _cancel_after_begin(stub: test_pb2_grpc.TestServiceStub): + call = stub.StreamingInputCall() + call.cancel() + if not call.cancelled(): + raise ValueError('expected cancelled method to return True') + code = await call.code() + if code is not grpc.StatusCode.CANCELLED: + raise ValueError('expected status code CANCELLED') + + +async def _cancel_after_first_response(stub: test_pb2_grpc.TestServiceStub): + request_response_sizes = ( + 31415, + 9, + 2653, + 58979, + ) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904, + ) + + call = stub.FullDuplexCall() + + response_size = request_response_sizes[0] + payload_size = request_payload_sizes[0] + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters( + size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + + await call.write(request) + await call.read() + + call.cancel() + + try: + await call.read() + except asyncio.CancelledError: + assert await call.code() is grpc.StatusCode.CANCELLED + else: + raise ValueError('expected call to be cancelled') + + +async def _timeout_on_sleeping_server(stub: test_pb2_grpc.TestServiceStub): + request_payload_size = 27182 + + call = stub.FullDuplexCall(timeout=0.001) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) + await call.write(request) + await call.done_writing() + try: + await call.read() + except aio.AioRpcError as rpc_error: + if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED: + raise + else: + raise ValueError('expected call to exceed deadline') + + +async def _empty_stream(stub: test_pb2_grpc.TestServiceStub): + call = stub.FullDuplexCall() + await call.done_writing() + assert await call.read() == aio.EOF + + +async def _status_code_and_message(stub: test_pb2_grpc.TestServiceStub): + details = 'test status message' + code = 2 + status = grpc.StatusCode.UNKNOWN # code = 2 + + # Test with a UnaryCall + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=1, + payload=messages_pb2.Payload(body=b'\x00'), + response_status=messages_pb2.EchoStatus(code=code, message=details)) + call = stub.UnaryCall(request) + await _validate_status_code_and_details(call, status, details) + + # Test with a FullDuplexCall + call = stub.FullDuplexCall() + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters(size=1),), + payload=messages_pb2.Payload(body=b'\x00'), + response_status=messages_pb2.EchoStatus(code=code, message=details)) + await call.write(request) # sends the initial request. + await call.done_writing() + await _validate_status_code_and_details(call, status, details) + + +async def _unimplemented_method(stub: test_pb2_grpc.TestServiceStub): + call = stub.UnimplementedCall(empty_pb2.Empty()) + await _expect_status_code(call, grpc.StatusCode.UNIMPLEMENTED) + + +async def _unimplemented_service(stub: test_pb2_grpc.UnimplementedServiceStub): + call = stub.UnimplementedCall(empty_pb2.Empty()) + await _expect_status_code(call, grpc.StatusCode.UNIMPLEMENTED) + + +async def _custom_metadata(stub: test_pb2_grpc.TestServiceStub): + initial_metadata_value = "test_initial_metadata_value" + trailing_metadata_value = b"\x0a\x0b\x0a\x0b\x0a\x0b" + metadata = ((_INITIAL_METADATA_KEY, initial_metadata_value), + (_TRAILING_METADATA_KEY, trailing_metadata_value)) + + async def _validate_metadata(call): + initial_metadata = dict(await call.initial_metadata()) + if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value: + raise ValueError('expected initial metadata %s, got %s' % + (initial_metadata_value, + initial_metadata[_INITIAL_METADATA_KEY])) + trailing_metadata = dict(await call.trailing_metadata()) + if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value: + raise ValueError('expected trailing metadata %s, got %s' % + (trailing_metadata_value, + trailing_metadata[_TRAILING_METADATA_KEY])) + + # Testing with UnaryCall + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=1, + payload=messages_pb2.Payload(body=b'\x00')) + call = stub.UnaryCall(request, metadata=metadata) + await _validate_metadata(call) + + # Testing with FullDuplexCall + call = stub.FullDuplexCall(metadata=metadata) + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters(size=1),)) + await call.write(request) + await call.read() + await call.done_writing() + await _validate_metadata(call) + + +async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub, args): + response = await _large_unary_common_behavior(stub, True, True, None) + if args.default_service_account != response.username: + raise ValueError('expected username %s, got %s' % + (args.default_service_account, response.username)) + + +async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, args): + json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] + response = await _large_unary_common_behavior(stub, True, True, None) + if wanted_email != response.username: + raise ValueError('expected username %s, got %s' % + (wanted_email, response.username)) + if args.oauth_scope.find(response.oauth_scope) == -1: + raise ValueError( + 'expected to find oauth scope "{}" in received "{}"'.format( + response.oauth_scope, args.oauth_scope)) + + +async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub, unused_args): + json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] + response = await _large_unary_common_behavior(stub, True, False, None) + if wanted_email != response.username: + raise ValueError('expected username %s, got %s' % + (wanted_email, response.username)) + + +async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, args): + json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] + google_credentials, unused_project_id = google_auth.default( + scopes=[args.oauth_scope]) + call_credentials = grpc.metadata_call_credentials( + google_auth_transport_grpc.AuthMetadataPlugin( + credentials=google_credentials, + request=google_auth_transport_requests.Request())) + response = await _large_unary_common_behavior(stub, True, False, + call_credentials) + if wanted_email != response.username: + raise ValueError('expected username %s, got %s' % + (wanted_email, response.username)) + + +async def _special_status_message(stub: test_pb2_grpc.TestServiceStub, args): + details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode( + 'utf-8') + code = 2 + status = grpc.StatusCode.UNKNOWN # code = 2 + + # Test with a UnaryCall + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=1, + payload=messages_pb2.Payload(body=b'\x00'), + response_status=messages_pb2.EchoStatus(code=code, message=details)) + call = stub.UnaryCall(request) + await _validate_status_code_and_details(call, status, details) + + +@enum.unique +class TestCase(enum.Enum): + EMPTY_UNARY = 'empty_unary' + LARGE_UNARY = 'large_unary' + SERVER_STREAMING = 'server_streaming' + CLIENT_STREAMING = 'client_streaming' + PING_PONG = 'ping_pong' + CANCEL_AFTER_BEGIN = 'cancel_after_begin' + CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' + EMPTY_STREAM = 'empty_stream' + STATUS_CODE_AND_MESSAGE = 'status_code_and_message' + UNIMPLEMENTED_METHOD = 'unimplemented_method' + UNIMPLEMENTED_SERVICE = 'unimplemented_service' + CUSTOM_METADATA = "custom_metadata" + COMPUTE_ENGINE_CREDS = 'compute_engine_creds' + OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' + JWT_TOKEN_CREDS = 'jwt_token_creds' + PER_RPC_CREDS = 'per_rpc_creds' + TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' + SPECIAL_STATUS_MESSAGE = 'special_status_message' + + async def test_interoperability(self, stub: test_pb2_grpc.TestServiceStub, + args) -> None: + if self is TestCase.EMPTY_UNARY: + await _empty_unary(stub) + elif self is TestCase.LARGE_UNARY: + await _large_unary(stub) + elif self is TestCase.SERVER_STREAMING: + await _server_streaming(stub) + elif self is TestCase.CLIENT_STREAMING: + await _client_streaming(stub) + elif self is TestCase.PING_PONG: + await _ping_pong(stub) + elif self is TestCase.CANCEL_AFTER_BEGIN: + await _cancel_after_begin(stub) + elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: + await _cancel_after_first_response(stub) + elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: + await _timeout_on_sleeping_server(stub) + elif self is TestCase.EMPTY_STREAM: + await _empty_stream(stub) + elif self is TestCase.STATUS_CODE_AND_MESSAGE: + await _status_code_and_message(stub) + elif self is TestCase.UNIMPLEMENTED_METHOD: + await _unimplemented_method(stub) + elif self is TestCase.UNIMPLEMENTED_SERVICE: + await _unimplemented_service(stub) + elif self is TestCase.CUSTOM_METADATA: + await _custom_metadata(stub) + elif self is TestCase.COMPUTE_ENGINE_CREDS: + await _compute_engine_creds(stub, args) + elif self is TestCase.OAUTH2_AUTH_TOKEN: + await _oauth2_auth_token(stub, args) + elif self is TestCase.JWT_TOKEN_CREDS: + await _jwt_token_creds(stub, args) + elif self is TestCase.PER_RPC_CREDS: + await _per_rpc_creds(stub, args) + elif self is TestCase.SPECIAL_STATUS_MESSAGE: + await _special_status_message(stub, args) + else: + raise NotImplementedError('Test case "%s" not implemented!' % + self.name) diff --git a/src/python/grpcio_tests/tests_aio/interop/server.py b/src/python/grpcio_tests/tests_aio/interop/server.py new file mode 100644 index 00000000000..5a5180075a3 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/interop/server.py @@ -0,0 +1,53 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The gRPC interoperability test server using AsyncIO stack.""" + +import asyncio +import argparse +import logging + +import grpc +from grpc.experimental.aio import init_grpc_aio + +from tests.interop import server as interop_server_lib +from tests_aio.unit import _test_server + +logging.basicConfig(level=logging.DEBUG) +_LOGGER = logging.getLogger(__name__) +_LOGGER.setLevel(logging.DEBUG) + + +async def serve(): + init_grpc_aio() + + args = interop_server_lib.parse_interop_server_arguments() + + if args.use_tls: + credentials = interop_server_lib.get_server_credentials() + + address, server = await _test_server.start_test_server( + port=args.port, secure=True, server_credentials=credentials) + else: + address, server = await _test_server.start_test_server( + port=args.port, + secure=False, + ) + + _LOGGER.info('Server serving at %s', address) + await server.wait_for_termination() + _LOGGER.info('Server stopped; exiting.') + + +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(serve()) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index 65b34431f00..35e2606f93b 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -1,5 +1,6 @@ [ "_sanity._sanity_test.AioSanityTest", + "interop.local_interop_test.InsecureLocalInteropTest", "unit.abort_test.TestAbort", "unit.aio_rpc_error_test.TestAioRpcError", "unit.call_test.TestStreamStreamCall", diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel index 6f3b5aad39f..1e6dd4720f6 100644 --- a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -25,11 +25,18 @@ py_library( srcs_version = "PY3", ) +py_library( + name = "_constants", + srcs = ["_constants.py"], + srcs_version = "PY3", +) + py_library( name = "_test_server", srcs = ["_test_server.py"], srcs_version = "PY3", deps = [ + ":_constants", "//src/proto/grpc/testing:empty_py_pb2", "//src/proto/grpc/testing:py_messages_proto", "//src/proto/grpc/testing:test_py_pb2_grpc", @@ -37,12 +44,6 @@ py_library( ], ) -py_library( - name = "_constants", - srcs = ["_constants.py"], - srcs_version = "PY3", -) - py_library( name = "_common", srcs = ["_common.py"], diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_server.py b/src/python/grpcio_tests/tests_aio/unit/_test_server.py index b289b67dbb0..5a135c5c84f 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_server.py @@ -17,14 +17,13 @@ import datetime import logging import grpc - from grpc.experimental import aio -from src.proto.grpc.testing import messages_pb2, test_pb2_grpc -from tests_aio.unit._constants import UNARY_CALL_WITH_SLEEP_VALUE +from src.proto.grpc.testing import empty_pb2, messages_pb2, test_pb2_grpc +from tests_aio.unit import _constants -_INITIAL_METADATA_KEY = "initial-md-key" -_TRAILING_METADATA_KEY = "trailing-md-key-bin" +_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" +_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" async def _maybe_echo_metadata(servicer_context): @@ -42,9 +41,14 @@ async def _maybe_echo_metadata(servicer_context): class _TestServiceServicer(test_pb2_grpc.TestServiceServicer): - async def UnaryCall(self, unused_request, context): + async def UnaryCall(self, request, context): await _maybe_echo_metadata(context) - return messages_pb2.SimpleResponse() + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + + async def EmptyCall(self, request, context): + return empty_pb2.Empty() async def StreamingOutputCall( self, request: messages_pb2.StreamingOutputCallRequest, @@ -62,8 +66,8 @@ class _TestServiceServicer(test_pb2_grpc.TestServiceServicer): # Next methods are extra ones that are registred programatically # when the sever is instantiated. They are not being provided by # the proto file. - async def UnaryCallWithSleep(self, request, context): - await asyncio.sleep(UNARY_CALL_WITH_SLEEP_VALUE) + async def UnaryCallWithSleep(self, unused_request, unused_context): + await asyncio.sleep(_constants.UNARY_CALL_WITH_SLEEP_VALUE) return messages_pb2.SimpleResponse() async def StreamingInputCall(self, request_async_iterator, unused_context): @@ -87,11 +91,7 @@ class _TestServiceServicer(test_pb2_grpc.TestServiceServicer): response_parameters.size)) -async def start_test_server(port=0, secure=False): - server = aio.server(options=(('grpc.so_reuseport', 0),)) - servicer = _TestServiceServicer() - test_pb2_grpc.add_TestServiceServicer_to_server(servicer, server) - +def _create_extra_generic_handler(servicer: _TestServiceServicer): # Add programatically extra methods not provided by the proto file # that are used during the tests rpc_method_handlers = { @@ -102,16 +102,24 @@ async def start_test_server(port=0, secure=False): response_serializer=messages_pb2.SimpleResponse. SerializeToString) } - extra_handler = grpc.method_handlers_generic_handler( - 'grpc.testing.TestService', rpc_method_handlers) - server.add_generic_rpc_handlers((extra_handler,)) + return grpc.method_handlers_generic_handler('grpc.testing.TestService', + rpc_method_handlers) + + +async def start_test_server(port=0, secure=False, server_credentials=None): + server = aio.server(options=(('grpc.so_reuseport', 0),)) + servicer = _TestServiceServicer() + test_pb2_grpc.add_TestServiceServicer_to_server(servicer, server) + + server.add_generic_rpc_handlers((_create_extra_generic_handler(servicer),)) if secure: - server_credentials = grpc.local_server_credentials( - grpc.LocalConnectionType.LOCAL_TCP) - port = server.add_secure_port(f'[::]:{port}', server_credentials) + if server_credentials is None: + server_credentials = grpc.local_server_credentials( + grpc.LocalConnectionType.LOCAL_TCP) + port = server.add_secure_port('[::]:%d' % port, server_credentials) else: - port = server.add_insecure_port(f'[::]:{port}') + port = server.add_insecure_port('[::]:%d' % port) await server.start() diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 77c7fabdc20..65dcc0a4e2b 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -33,8 +33,8 @@ _UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep' _STREAMING_OUTPUT_CALL_METHOD = '/grpc.testing.TestService/StreamingOutputCall' _INVOCATION_METADATA = ( - ('initial-md-key', 'initial-md-value'), - ('trailing-md-key-bin', b'\x00\x02'), + ('x-grpc-test-echo-initial', 'initial-md-value'), + ('x-grpc-test-echo-trailing-bin', b'\x00\x02'), ) _NUM_STREAM_RESPONSES = 5 diff --git a/tools/dockerfile/interoptest/grpc_interop_pythonasyncio/Dockerfile b/tools/dockerfile/interoptest/grpc_interop_pythonasyncio/Dockerfile new file mode 100644 index 00000000000..3b07aabd6d5 --- /dev/null +++ b/tools/dockerfile/interoptest/grpc_interop_pythonasyncio/Dockerfile @@ -0,0 +1,68 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM debian:stretch + +# Install Git and basic packages. +RUN apt-get update && apt-get install -y \ + autoconf \ + autotools-dev \ + build-essential \ + bzip2 \ + ccache \ + curl \ + dnsutils \ + gcc \ + gcc-multilib \ + git \ + golang \ + gyp \ + lcov \ + libc6 \ + libc6-dbg \ + libc6-dev \ + libgtest-dev \ + libtool \ + make \ + perl \ + strace \ + python-dev \ + python-setuptools \ + python-yaml \ + telnet \ + unzip \ + wget \ + zip && apt-get clean + +#================ +# Build profiling +RUN apt-get update && apt-get install -y time && apt-get clean + +# Google Cloud platform API libraries +RUN apt-get update && apt-get install -y python-pip && apt-get clean +RUN pip install --upgrade google-api-python-client oauth2client + +# Install Python 2.7 +RUN apt-get update && apt-get install -y python2.7 python-all-dev +RUN curl https://bootstrap.pypa.io/get-pip.py | python2.7 + +# Add Debian 'buster' repository, we will need it for installing newer versions of python +RUN echo 'deb http://ftp.de.debian.org/debian buster main' >> /etc/apt/sources.list +RUN echo 'APT::Default-Release "stretch";' | tee -a /etc/apt/apt.conf.d/00local + +RUN mkdir /var/local/jenkins + +# Install Python 3.7 +RUN apt-get update && apt-get -t stable install -y python3.7 python3-all-dev +RUN curl https://bootstrap.pypa.io/get-pip.py | python3.7 diff --git a/tools/dockerfile/interoptest/grpc_interop_pythonasyncio/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_pythonasyncio/build_interop.sh new file mode 100755 index 00000000000..c82935ce22c --- /dev/null +++ b/tools/dockerfile/interoptest/grpc_interop_pythonasyncio/build_interop.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Builds Python interop server and client in a base image. +set -e + +mkdir -p /var/local/git +git clone /var/local/jenkins/grpc /var/local/git/grpc +# clone gRPC submodules, use data from locally cloned submodules where possible +(cd /var/local/jenkins/grpc/ && git submodule foreach 'cd /var/local/git/grpc \ +&& git submodule update --init --reference /var/local/jenkins/grpc/${name} \ +${name}') + +# copy service account keys if available +cp -r /var/local/jenkins/service_account $HOME || true + +cd /var/local/git/grpc + +# interop tests only run using python3.7 currently (and python build is slow) +tools/run_tests/run_tests.py -l python --compiler python3.7 -c opt --build_only diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index 03b1e4d4da7..9ff3e461a69 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -56,7 +56,7 @@ LANG_RUNTIME_MATRIX = { 'cxx': ['cxx'], # This is actually debian8. 'go': ['go1.8', 'go1.11'], 'java': ['java'], - 'python': ['python'], + 'python': ['python', 'pythonasyncio'], 'node': ['node'], 'ruby': ['ruby'], 'php': ['php', 'php7'], diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index be0758274b0..0d1c41cb0c4 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -668,6 +668,60 @@ class PythonLanguage: return 'python' +class PythonAsyncIOClient: + + def __init__(self): + self.client_cwd = None + self.server_cwd = None + self.http2_cwd = None + self.safename = str(self) + + def client_cmd(self, args): + return [ + 'py37_native/bin/python', 'src/python/grpcio_tests/setup.py', + 'run_interop', '--client', '--args="{}"'.format(' '.join(args)) + ] + + def client_cmd_http2interop(self, args): + return [ + 'py37_native/bin/python', + 'src/python/grpcio_tests/tests/http2/negative_http2_client.py', + ] + args + + def cloud_to_prod_env(self): + return {} + + def server_cmd(self, args): + return [ + 'py37_native/bin/python', 'src/python/grpcio_tests/setup.py', + 'run_interop', '--asyncio', '--server', + '--args="{}"'.format(' '.join(args)) + ] + + def global_env(self): + return { + 'LD_LIBRARY_PATH': '{}/libs/opt'.format(DOCKER_WORKDIR_ROOT), + 'PYTHONPATH': '{}/src/python/gens'.format(DOCKER_WORKDIR_ROOT) + } + + def unimplemented_test_cases(self): + # TODO(https://github.com/grpc/grpc/issues/21707) + return _SKIP_COMPRESSION + \ + _SKIP_DATA_FRAME_PADDING + \ + _AUTH_TEST_CASES + \ + ['timeout_on_sleeping_server'] + + def unimplemented_test_cases_server(self): + # TODO(https://github.com/grpc/grpc/issues/21749) + return _TEST_CASES + \ + _AUTH_TEST_CASES + \ + _HTTP2_TEST_CASES + \ + _HTTP2_SERVER_TEST_CASES + + def __str__(self): + return 'pythonasyncio' + + _LANGUAGES = { 'c++': CXXLanguage(), 'csharp': CSharpLanguage(), @@ -684,12 +738,13 @@ _LANGUAGES = { 'objc': ObjcLanguage(), 'ruby': RubyLanguage(), 'python': PythonLanguage(), + 'pythonasyncio': PythonAsyncIOClient(), } # languages supported as cloud_to_cloud servers _SERVERS = [ 'c++', 'node', 'csharp', 'csharpcoreclr', 'aspnetcore', 'java', 'go', - 'ruby', 'python', 'dart' + 'ruby', 'python', 'dart', 'pythonasyncio' ] _TEST_CASES = [