diff --git a/.gitignore b/.gitignore index ed1669c4ccc..06e8706f9a8 100644 --- a/.gitignore +++ b/.gitignore @@ -147,3 +147,7 @@ cmake-build-debug/ # Benchmark outputs BenchmarkDotNet.Artifacts/ + +# pyenv config +.python-version + diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 2912ba113c9..301a2bea23b 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -20,7 +20,6 @@ import os.path import platform import re import shutil -import subprocess import sys import setuptools @@ -125,7 +124,10 @@ class TestAio(setuptools.Command): import tests loader = tests.Loader() loader.loadTestsFromNames(['tests_aio']) - runner = tests.Runner() + # Even without dedicated threads, the framework will somehow spawn a + # new thread for tests to run upon. New thread doesn't have event loop + # attached by default, so initialization is needed. + runner = tests.Runner(dedicated_threads=False) result = runner.run(loader.suite) if not result.wasSuccessful(): sys.exit('Test failure') diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py index 9ef0f176840..804b1a6e331 100644 --- a/src/python/grpcio_tests/tests/_runner.py +++ b/src/python/grpcio_tests/tests/_runner.py @@ -117,8 +117,15 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])): class Runner(object): - def __init__(self): + def __init__(self, dedicated_threads=False): + """Constructs the Runner object. + + Args: + dedicated_threads: A bool indicates whether to spawn each unit test + in separate thread or not. + """ self._skipped_tests = [] + self._dedicated_threads = dedicated_threads def skip_tests(self, tests): self._skipped_tests = tests @@ -194,24 +201,31 @@ class Runner(object): sys.stdout.write('Running {}\n'.format( augmented_case.case.id())) sys.stdout.flush() - case_thread = threading.Thread( - target=augmented_case.case.run, args=(result,)) - try: - with stdout_pipe, stderr_pipe: - case_thread.start() - while case_thread.is_alive(): - check_kill_self() - time.sleep(0) - case_thread.join() - except: # pylint: disable=try-except-raise - # re-raise the exception after forcing the with-block to end - raise - result.set_output(augmented_case.case, stdout_pipe.output(), - stderr_pipe.output()) - sys.stdout.write(result_out.getvalue()) - sys.stdout.flush() - result_out.truncate(0) - check_kill_self() + if self._dedicated_threads: + # (Deprecated) Spawns dedicated thread for each test case. + case_thread = threading.Thread( + target=augmented_case.case.run, args=(result,)) + try: + with stdout_pipe, stderr_pipe: + case_thread.start() + # If the thread is exited unexpected, stop testing. + while case_thread.is_alive(): + check_kill_self() + time.sleep(0) + case_thread.join() + except: # pylint: disable=try-except-raise + # re-raise the exception after forcing the with-block to end + raise + # Records the result of the test case run. + result.set_output(augmented_case.case, stdout_pipe.output(), + stderr_pipe.output()) + sys.stdout.write(result_out.getvalue()) + sys.stdout.flush() + result_out.truncate(0) + check_kill_self() + else: + # Donates current thread to test case execution. + augmented_case.case.run(result) result.stopTestRun() stdout_pipe.close() stderr_pipe.close() diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index 23439717b18..0fab86e49bc 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -2,5 +2,6 @@ "_sanity._sanity_test.AioSanityTest", "unit.channel_test.TestChannel", "unit.init_test.TestAioRpcError", - "unit.init_test.TestInsecureChannel" + "unit.init_test.TestInsecureChannel", + "unit.server_test.TestServer" ] diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel index c6d0a9f7728..82a12f9b7e4 100644 --- a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -17,9 +17,26 @@ package( default_visibility = ["//visibility:public"], ) -GRPC_ASYNC_TESTS = [ - "server_test.py", -] +GRPC_ASYNC_TESTS = glob(["*_test.py"]) + + +py_library( + name = "_test_base", + srcs_version = "PY3", + srcs = ["_test_base.py"], +) + +py_library( + name = "_test_server", + srcs_version = "PY3", + srcs = ["_test_server.py"], + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:test_py_pb2_grpc", + "//src/proto/grpc/testing:empty_py_pb2", + ] +) [ py_test( @@ -29,10 +46,13 @@ GRPC_ASYNC_TESTS = [ main=test_file_name, python_version="PY3", deps=[ + ":_test_server", + ":_test_base", "//src/python/grpcio/grpc:grpcio", "//src/proto/grpc/testing:py_messages_proto", "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", "//src/proto/grpc/testing:benchmark_service_py_pb2", + "//src/python/grpcio_tests/tests/unit/framework/common", "//external:six" ], imports=["../../",], diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_base.py b/src/python/grpcio_tests/tests_aio/unit/_test_base.py new file mode 100644 index 00000000000..61602259043 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/_test_base.py @@ -0,0 +1,29 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import unittest +from grpc.experimental import aio + + +class AioTestBase(unittest.TestCase): + + def setUp(self): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + aio.init_grpc_aio() + + @property + def loop(self): + return self._loop diff --git a/src/python/grpcio_tests/tests_aio/unit/sync_server.py b/src/python/grpcio_tests/tests_aio/unit/_test_server.py similarity index 51% rename from src/python/grpcio_tests/tests_aio/unit/sync_server.py rename to src/python/grpcio_tests/tests_aio/unit/_test_server.py index 82def8ed4c0..5f3661f42cf 100644 --- a/src/python/grpcio_tests/tests_aio/unit/sync_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_server.py @@ -1,4 +1,4 @@ -# Copyright 2019 gRPC authors. +# Copyright 2019 The gRPC Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,38 +18,27 @@ from concurrent import futures from time import sleep import grpc +from grpc.experimental import aio from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc from tests.unit.framework.common import test_constants -# TODO (https://github.com/grpc/grpc/issues/19762) -# Change for an asynchronous server version once it's implemented. -class TestServiceServicer(test_pb2_grpc.TestServiceServicer): +class _TestServiceServicer(test_pb2_grpc.TestServiceServicer): - def UnaryCall(self, request, context): + async def UnaryCall(self, request, context): return messages_pb2.SimpleResponse() - def EmptyCall(self, request, context): + async def EmptyCall(self, request, context): while True: sleep(test_constants.LONG_TIMEOUT) -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Synchronous gRPC server.') - parser.add_argument( - '--host_and_port', - required=True, - type=str, - nargs=1, - help='the host and port to listen.') - args = parser.parse_args() - - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=1), - options=(('grpc.so_reuseport', 1),)) - test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), +async def start_test_server(): + server = aio.server(options=(('grpc.so_reuseport', 0),)) + test_pb2_grpc.add_TestServiceServicer_to_server(_TestServiceServicer(), server) - server.add_insecure_port(args.host_and_port[0]) - server.start() - server.wait_for_termination() + port = server.add_insecure_port('[::]:0') + await server.start() + # NOTE(lidizheng) returning the server to prevent it from deallocation + return 'localhost:%d' % port, server diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index cdf7a4cd49a..e18b6da6d39 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -12,23 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import unittest import grpc from grpc.experimental import aio -from tests_aio.unit import test_base from src.proto.grpc.testing import messages_pb2 from tests.unit.framework.common import test_constants +from tests_aio.unit._test_server import start_test_server +from tests_aio.unit._test_base import AioTestBase -class TestChannel(test_base.AioTestBase): +class TestChannel(AioTestBase): def test_async_context(self): async def coro(): - async with aio.insecure_channel(self.server_target) as channel: + server_target, unused_server = await start_test_server() + + async with aio.insecure_channel(server_target) as channel: hi = channel.unary_unary( '/grpc.testing.TestService/UnaryCall', request_serializer=messages_pb2.SimpleRequest. @@ -42,7 +46,9 @@ class TestChannel(test_base.AioTestBase): def test_unary_unary(self): async def coro(): - channel = aio.insecure_channel(self.server_target) + server_target, unused_server = await start_test_server() + + channel = aio.insecure_channel(server_target) hi = channel.unary_unary( '/grpc.testing.TestService/UnaryCall', request_serializer=messages_pb2.SimpleRequest.SerializeToString, @@ -58,7 +64,9 @@ class TestChannel(test_base.AioTestBase): def test_unary_call_times_out(self): async def coro(): - async with aio.insecure_channel(self.server_target) as channel: + server_target, unused_server = await start_test_server() + + async with aio.insecure_channel(server_target) as channel: empty_call_with_sleep = channel.unary_unary( "/grpc.testing.TestService/EmptyCall", request_serializer=messages_pb2.SimpleRequest. diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index 8371d44574e..297f178ee44 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import unittest import grpc from grpc.experimental import aio -from tests_aio.unit import test_base +from tests_aio.unit._test_server import start_test_server +from tests_aio.unit._test_base import AioTestBase class TestAioRpcError(unittest.TestCase): @@ -59,12 +61,14 @@ class TestAioRpcError(unittest.TestCase): second_aio_rpc_error.__class__) -class TestInsecureChannel(test_base.AioTestBase): +class TestInsecureChannel(AioTestBase): def test_insecure_channel(self): async def coro(): - channel = aio.insecure_channel(self.server_target) + server_target, unused_server = await start_test_server() + + channel = aio.insecure_channel(server_target) self.assertIsInstance(channel, aio.Channel) self.loop.run_until_complete(coro()) diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 2d543893176..15f4ff182d6 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -20,6 +20,7 @@ import grpc from grpc.experimental import aio from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import benchmark_service_pb2_grpc +from tests_aio.unit._test_base import AioTestBase _TEST_METHOD_PATH = '' @@ -37,10 +38,9 @@ class GenericHandler(grpc.GenericRpcHandler): return grpc.unary_unary_rpc_method_handler(unary_unary) -class TestServer(unittest.TestCase): +class TestServer(AioTestBase): def test_unary_unary(self): - loop = asyncio.get_event_loop() async def test_unary_unary_body(): server = aio.server() @@ -53,10 +53,9 @@ class TestServer(unittest.TestCase): response = await unary_call(_REQUEST) self.assertEqual(response, _RESPONSE) - loop.run_until_complete(test_unary_unary_body()) + self.loop.run_until_complete(test_unary_unary_body()) if __name__ == '__main__': - aio.init_grpc_aio() logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/test_base.py b/src/python/grpcio_tests/tests_aio/unit/test_base.py deleted file mode 100644 index 0b325523e0f..00000000000 --- a/src/python/grpcio_tests/tests_aio/unit/test_base.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2019 The gRPC Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -import subprocess - -import asyncio -import unittest -import socket - -from grpc.experimental import aio -from tests_aio.unit import sync_server - - -def _get_free_loopback_tcp_port(): - if socket.has_ipv6: - tcp_socket = socket.socket(socket.AF_INET6) - host = "::1" - host_target = "[::1]" - else: - tcp_socket = socket.socket(socket.AF_INET) - host = "127.0.0.1" - host_target = host - tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - tcp_socket.bind((host, 0)) - address_tuple = tcp_socket.getsockname() - return tcp_socket, "%s:%s" % (host_target, address_tuple[1]) - - -class _Server: - """_Server is an wrapper for a sync-server subprocess. - - The synchronous server is executed in another process which initializes - implicitly the grpc using the synchronous configuration. Both worlds - can not coexist within the same process. - """ - - def __init__(self, host_and_port): # pylint: disable=W0621 - self._host_and_port = host_and_port - self._handle = None - - def start(self): - assert self._handle is None - - try: - from google3.pyglib import resources - executable = resources.GetResourceFilename( - "google3/third_party/py/grpc/sync_server") - args = [executable, '--host_and_port', self._host_and_port] - except ImportError: - executable = sys.executable - directory, _ = os.path.split(os.path.abspath(__file__)) - filename = directory + '/sync_server.py' - args = [ - executable, filename, '--host_and_port', self._host_and_port - ] - - self._handle = subprocess.Popen(args) - - def terminate(self): - if not self._handle: - return - - self._handle.terminate() - self._handle.wait() - self._handle = None - - -class AioTestBase(unittest.TestCase): - - def setUp(self): - self._socket, self._target = _get_free_loopback_tcp_port() - self._server = _Server(self._target) - self._server.start() - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - aio.init_grpc_aio() - - def tearDown(self): - self._server.terminate() - self._socket.close() - - @property - def loop(self): - return self._loop - - @property - def server_target(self): - return self._target diff --git a/tools/run_tests/helper_scripts/build_python.sh b/tools/run_tests/helper_scripts/build_python.sh index 7cd1ef9d517..87c16a2ec4f 100755 --- a/tools/run_tests/helper_scripts/build_python.sh +++ b/tools/run_tests/helper_scripts/build_python.sh @@ -122,7 +122,8 @@ export LANG=en_US.UTF-8 # Allow build_ext to build C/C++ files in parallel # by enabling a monkeypatch. It speeds up the build a lot. -export GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=4 +DEFAULT_PARALLEL_JOBS=$(nproc) || DEFAULT_PARALLEL_JOBS=4 +export GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=${GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS:-$DEFAULT_PARALLEL_JOBS} # If ccache is available on Linux, use it. if [ "$(is_linux)" ]; then diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 33b455030c0..59edaaed4ed 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -705,9 +705,16 @@ class PythonConfig( class PythonLanguage(object): - _DEFAULT_COMMAND = 'test_lite' - _TEST_SPECS_FILE = 'src/python/grpcio_tests/tests/tests.json' - _TEST_FOLDER = 'test' + _TEST_SPECS_FILE = { + 'native': 'src/python/grpcio_tests/tests/tests.json', + 'gevent': 'src/python/grpcio_tests/tests/tests.json', + 'asyncio': 'src/python/grpcio_tests/tests_aio/tests.json', + } + _TEST_FOLDER = { + 'native': 'test', + 'gevent': 'test', + 'asyncio': 'test_aio', + } def configure(self, config, args): self.config = config @@ -716,7 +723,8 @@ class PythonLanguage(object): def test_specs(self): # load list of known test suites - with open(self._TEST_SPECS_FILE) as tests_json_file: + with open(self._TEST_SPECS_FILE[ + self.args.iomgr_platform]) as tests_json_file: tests_json = json.load(tests_json_file) environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS) return [ @@ -726,8 +734,9 @@ class PythonLanguage(object): environ=dict( list(environment.items()) + [( 'GRPC_PYTHON_TESTRUNNER_FILTER', str(suite_name))]), - shortname='%s.%s.%s' % (config.name, self._TEST_FOLDER, - suite_name), + shortname='%s.%s.%s' % + (config.name, self._TEST_FOLDER[self.args.iomgr_platform], + suite_name), ) for suite_name in tests_json for config in self.pythons ] @@ -795,9 +804,17 @@ class PythonLanguage(object): venv_relative_python = ['bin/python'] toolchain = ['unix'] - test_command = self._DEFAULT_COMMAND - if args.iomgr_platform == 'gevent': + # Selects the corresponding testing mode. + # See src/python/grpcio_tests/commands.py for implementation details. + if args.iomgr_platform == 'native': + test_command = 'test_lite' + elif args.iomgr_platform == 'gevent': test_command = 'test_gevent' + elif args.iomgr_platform == 'asyncio': + test_command = 'test_aio' + else: + raise ValueError( + 'Unsupported IO Manager platform: %s' % args.iomgr_platform) runner = [ os.path.abspath('tools/run_tests/helper_scripts/run_python.sh') ] @@ -846,15 +863,25 @@ class PythonLanguage(object): pypy32_config = _pypy_config_generator( name='pypy3', major='3', config_vars=config_vars) + if args.iomgr_platform == 'asyncio': + if args.compiler not in ('default', 'python3.6', 'python3.7', + 'python3.8'): + raise Exception( + 'Compiler %s not supported with IO Manager platform: %s' % + (args.compiler, args.iomgr_platform)) + if args.compiler == 'default': if os.name == 'nt': return (python35_config,) else: - return ( - python27_config, - python36_config, - python37_config, - ) + if args.iomgr_platform == 'asyncio': + return (python36_config,) + else: + return ( + python27_config, + python36_config, + python37_config, + ) elif args.compiler == 'python2.7': return (python27_config,) elif args.compiler == 'python3.4': @@ -889,31 +916,6 @@ class PythonLanguage(object): return 'python' -class PythonAioLanguage(PythonLanguage): - - _DEFAULT_COMMAND = 'test_aio' - _TEST_SPECS_FILE = 'src/python/grpcio_tests/tests_aio/tests.json' - _TEST_FOLDER = 'test_aio' - - def configure(self, config, args): - self.config = config - self.args = args - self.pythons = self._get_pythons(self.args) - - def _get_pythons(self, args): - """Get python runtimes to test with, based on current platform, architecture, compiler etc.""" - - if args.compiler not in ('python3.6', 'python3.7', 'python3.8'): - raise Exception('Compiler %s not supported.' % args.compiler) - if args.iomgr_platform not in ('native'): - raise Exception( - 'Iomgr platform %s not supported.' % args.iomgr_platform) - return super()._get_pythons(args) - - def __str__(self): - return 'python_aio' - - class RubyLanguage(object): def configure(self, config, args): @@ -1321,7 +1323,6 @@ _LANGUAGES = { 'php': PhpLanguage(), 'php7': Php7Language(), 'python': PythonLanguage(), - 'python-aio': PythonAioLanguage(), 'ruby': RubyLanguage(), 'csharp': CSharpLanguage(), 'objc': ObjCLanguage(), @@ -1489,7 +1490,7 @@ argp.add_argument( ) argp.add_argument( '--iomgr_platform', - choices=['native', 'uv', 'gevent'], + choices=['native', 'uv', 'gevent', 'asyncio'], default='native', help='Selects iomgr platform to build on') argp.add_argument( diff --git a/tools/run_tests/run_tests_matrix.py b/tools/run_tests/run_tests_matrix.py index 99f95f4912a..4b90ae76a4a 100755 --- a/tools/run_tests/run_tests_matrix.py +++ b/tools/run_tests/run_tests_matrix.py @@ -231,7 +231,7 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS): languages=['python'], configs=['opt'], platforms=['linux', 'macos', 'windows'], - iomgr_platforms=['native', 'gevent'], + iomgr_platforms=['native', 'gevent', 'asyncio'], labels=['basictests', 'multilang'], extra_args=extra_args + ['--report_multi_target'], inner_jobs=inner_jobs)