diff --git a/src/core/lib/iomgr/pollset_custom.cc b/src/core/lib/iomgr/pollset_custom.cc index 62c4b397dd0..1165ea889de 100644 --- a/src/core/lib/iomgr/pollset_custom.cc +++ b/src/core/lib/iomgr/pollset_custom.cc @@ -77,14 +77,14 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset, // control back to the application grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get(); grpc_core::ExecCtx::Set(nullptr); - poller_vtable->poll(static_cast(timeout)); + grpc_error* err = poller_vtable->poll(static_cast(timeout)); grpc_core::ExecCtx::Set(curr); grpc_core::ExecCtx::Get()->InvalidateNow(); if (grpc_core::ExecCtx::Get()->HasWork()) { grpc_core::ExecCtx::Get()->Flush(); } gpr_mu_lock(&pollset->mu); - return GRPC_ERROR_NONE; + return err; } static grpc_error_handle pollset_kick( diff --git a/src/core/lib/iomgr/pollset_custom.h b/src/core/lib/iomgr/pollset_custom.h index 9e2027f7f4a..de3067f48ea 100644 --- a/src/core/lib/iomgr/pollset_custom.h +++ b/src/core/lib/iomgr/pollset_custom.h @@ -21,11 +21,13 @@ #include +#include "src/core/lib/iomgr/error.h" + #include typedef struct grpc_custom_poller_vtable { void (*init)(); - void (*poll)(size_t timeout_ms); + grpc_error* (*poll)(size_t timeout_ms); void (*kick)(); void (*shutdown)(); } grpc_custom_poller_vtable; diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index bade6eae6c5..ef76870723f 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -24,6 +24,7 @@ #include #include +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/pollset_custom.h" #include @@ -54,7 +55,7 @@ static void empty_timer_cb(uv_timer_t* handle) {} static void kick_timer_cb(uv_timer_t* handle) { g_kicked = false; } -static void run_loop(size_t timeout) { +static grpc_error* run_loop(size_t timeout) { if (grpc_pollset_work_run_loop) { if (timeout == 0) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -64,6 +65,7 @@ static void run_loop(size_t timeout) { uv_timer_stop(&g_handle->poll_timer); } } + return GRPC_ERROR_NONE; } static void kick() { diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index de82bcc1d31..84fdefcc169 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -19,11 +19,15 @@ #ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H #define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H +#include + +#include "src/core/lib/iomgr/error.h" + extern int grpc_pollset_work_run_loop; typedef struct grpc_custom_poller_vtable { void (*init)(void); - void (*run_loop)(int blocking); + grpc_error* (*run_loop)(int blocking); } grpc_custom_poller_vtable; void grpc_custom_pollset_global_init(grpc_custom_poller_vtable* vtable); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 2b889e0de5c..cdb7f80dc60 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -1065,7 +1065,11 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, gpr_log(GPR_ERROR, "Completion queue next failed: %s", grpc_error_std_string(err).c_str()); GRPC_ERROR_UNREF(err); - ret.type = GRPC_QUEUE_TIMEOUT; + if (err == GRPC_ERROR_CANCELLED) { + ret.type = GRPC_QUEUE_SHUTDOWN; + } else { + ret.type = GRPC_QUEUE_TIMEOUT; + } ret.success = 0; dump_pending_tags(cq); break; diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 86171def061..09232767544 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -208,8 +208,8 @@ cdef void asyncio_kick_loop() with gil: pass -cdef void asyncio_run_loop(size_t timeout_ms) with gil: - pass +cdef grpc_error* asyncio_run_loop(size_t timeout_ms) with gil: + return grpc_error_none() def _auth_plugin_callback_wrapper(object cb, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 54c55f85f12..cb5fad9ebf6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -31,7 +31,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) if gpr_time_cmp(c_timeout, c_deadline) > 0: c_timeout = c_deadline - + c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) if (c_event.type != GRPC_QUEUE_TIMEOUT or diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index fb8ceae6a5d..2ef4eb26834 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -699,3 +699,7 @@ cdef extern from "grpc/grpc_security_constants.h": ctypedef enum grpc_local_connect_type: UDS LOCAL_TCP + + +cdef extern from "src/core/lib/iomgr/error.h": + const grpc_error* GRPC_ERROR_CANCELLED diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi index c86ff2f1ff1..22b476628ad 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -15,6 +15,7 @@ from libc cimport string import errno +import sys gevent_g = None gevent_socket = None gevent_hub = None @@ -348,12 +349,24 @@ cdef void destroy_loop() with gil: cdef void kick_loop() with gil: g_event.set() -cdef void run_loop(size_t timeout_ms) with gil: - timeout = timeout_ms / 1000.0 - if timeout_ms > 0: +def _run_loop(timeout_ms): + timeout = timeout_ms / 1000.0 + if timeout_ms > 0: + try: g_event.wait(timeout) + finally: g_event.clear() +cdef grpc_error* run_loop(size_t timeout_ms) with gil: + try: + _run_loop(timeout_ms) + return grpc_error_none() + except BaseException: + exc_info = sys.exc_info() + # Avoid running any Python code after setting the exception + cpython.PyErr_SetObject(exc_info[0], exc_info[1]) + return GRPC_ERROR_CANCELLED + ############################### ### Initializer ############### ############################### diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi index 9cf3113bbf7..b427554293c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi @@ -110,7 +110,7 @@ cdef extern from "src/core/lib/iomgr/timer_custom.h": cdef extern from "src/core/lib/iomgr/pollset_custom.h": struct grpc_custom_poller_vtable: void (*init)() - void (*poll)(size_t timeout_ms) + grpc_error* (*poll)(size_t timeout_ms) void (*kick)() void (*shutdown)() diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index f5f3849ad2c..ab7dbc5caa0 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -257,7 +257,7 @@ class TestGevent(setuptools.Command): import tests loader = tests.Loader() - loader.loadTestsFromNames(['tests']) + loader.loadTestsFromNames(['tests', 'tests_gevent']) runner = tests.Runner() if sys.platform == 'win32': runner.skip_tests(self.BANNED_TESTS + self.BANNED_WINDOWS_TESTS) diff --git a/src/python/grpcio_tests/tests_gevent/__init__.py b/src/python/grpcio_tests/tests_gevent/__init__.py new file mode 100644 index 00000000000..712a2e1de2a --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_gevent/tests.json b/src/python/grpcio_tests/tests_gevent/tests.json new file mode 100644 index 00000000000..11307dcb116 --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/tests.json @@ -0,0 +1 @@ +["unit.close_channel_test.CloseChannelTest"] diff --git a/src/python/grpcio_tests/tests_gevent/unit/__init__.py b/src/python/grpcio_tests/tests_gevent/unit/__init__.py new file mode 100644 index 00000000000..712a2e1de2a --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_gevent/unit/_test_server.py b/src/python/grpcio_tests/tests_gevent/unit/_test_server.py new file mode 100644 index 00000000000..f1ba23e093f --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/unit/_test_server.py @@ -0,0 +1,58 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc +import grpc +import gevent +from typing import Any, Tuple + +LONG_UNARY_CALL_WITH_SLEEP_VALUE = 1 + + +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + + def UnaryCall(self, request, context): + return messages_pb2.SimpleResponse() + + def UnaryCallWithSleep(self, unused_request, unused_context): + gevent.sleep(LONG_UNARY_CALL_WITH_SLEEP_VALUE) + return messages_pb2.SimpleResponse() + + +def start_test_server(port: int = 0) -> Tuple[str, Any]: + server = grpc.server(futures.ThreadPoolExecutor()) + servicer = TestServiceServicer() + test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), + server) + + server.add_generic_rpc_handlers((_create_extra_generic_handler(servicer),)) + port = server.add_insecure_port('[::]:%d' % port) + server.start() + return 'localhost:%d' % port, server + + +def _create_extra_generic_handler(servicer: TestServiceServicer) -> Any: + # Add programatically extra methods not provided by the proto file + # that are used during the tests + rpc_method_handlers = { + 'UnaryCallWithSleep': + grpc.unary_unary_rpc_method_handler( + servicer.UnaryCallWithSleep, + request_deserializer=messages_pb2.SimpleRequest.FromString, + response_serializer=messages_pb2.SimpleResponse. + SerializeToString) + } + return grpc.method_handlers_generic_handler('grpc.testing.TestService', + rpc_method_handlers) diff --git a/src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py b/src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py new file mode 100644 index 00000000000..54f13611dab --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py @@ -0,0 +1,102 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc +import grpc +import gevent +import sys +from gevent.pool import Group +from tests_gevent.unit._test_server import start_test_server + +_UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep' + + +class CloseChannelTest(unittest.TestCase): + + def setUp(self): + self._server_target, self._server = start_test_server() + self._channel = grpc.insecure_channel(self._server_target) + self._unhandled_exception = False + sys.excepthook = self._global_exception_handler + + def tearDown(self): + self._channel.close() + self._server.stop(None) + + def test_graceful_close(self): + stub = test_pb2_grpc.TestServiceStub(self._channel) + _, response = stub.UnaryCall.with_call(messages_pb2.SimpleRequest()) + + self._channel.close() + + self.assertEqual(grpc.StatusCode.OK, response.code()) + + def test_graceful_close_in_greenlet(self): + group = Group() + stub = test_pb2_grpc.TestServiceStub(self._channel) + greenlet = group.spawn(self._run_client, stub.UnaryCall) + # release loop so that greenlet can take control + gevent.sleep() + self._channel.close() + group.killone(greenlet) + self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit") + try: + greenlet.get() + except Exception as e: # pylint: disable=broad-except + self.fail(f"Unexpected exception in greenlet: {e}") + + def test_ungraceful_close_in_greenlet(self): + group = Group() + UnaryCallWithSleep = self._channel.unary_unary( + _UNARY_CALL_METHOD_WITH_SLEEP, + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString, + ) + greenlet = group.spawn(self._run_client, UnaryCallWithSleep) + # release loop so that greenlet can take control + gevent.sleep() + group.killone(greenlet) + self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit") + + def test_kill_greenlet_with_generic_exception(self): + group = Group() + UnaryCallWithSleep = self._channel.unary_unary( + _UNARY_CALL_METHOD_WITH_SLEEP, + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString, + ) + greenlet = group.spawn(self._run_client, UnaryCallWithSleep) + # release loop so that greenlet can take control + gevent.sleep() + group.killone(greenlet, exception=Exception) + self.assertFalse(self._unhandled_exception, "Unhandled exception") + self.assertRaises(Exception, greenlet.get) + + def _run_client(self, call): + try: + call.with_call(messages_pb2.SimpleRequest()) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.CANCELLED: + raise + + def _global_exception_handler(self, exctype, value, tb): + if exctype == gevent.GreenletExit: + self._unhandled_exception = True + return + sys.__excepthook__(exctype, value, tb) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tools/distrib/pylint_code.sh b/tools/distrib/pylint_code.sh index 060c2b7ffd9..83faa932ba9 100755 --- a/tools/distrib/pylint_code.sh +++ b/tools/distrib/pylint_code.sh @@ -32,6 +32,7 @@ DIRS=( TEST_DIRS=( 'src/python/grpcio_tests/tests' + 'src/python/grpcio_tests/tests_gevent' ) VIRTUALENV=python_pylint_venv diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 5176331c3fb..afb96226dbb 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -622,13 +622,16 @@ class PythonConfig( class PythonLanguage(object): _TEST_SPECS_FILE = { - 'native': 'src/python/grpcio_tests/tests/tests.json', - 'gevent': 'src/python/grpcio_tests/tests/tests.json', - 'asyncio': 'src/python/grpcio_tests/tests_aio/tests.json', + 'native': ['src/python/grpcio_tests/tests/tests.json'], + 'gevent': [ + 'src/python/grpcio_tests/tests/tests.json', + 'src/python/grpcio_tests/tests_gevent/tests.json', + ], + 'asyncio': ['src/python/grpcio_tests/tests_aio/tests.json'], } _TEST_FOLDER = { 'native': 'test', - 'gevent': 'test', + 'gevent': 'test_gevent', 'asyncio': 'test_aio', } @@ -639,9 +642,11 @@ class PythonLanguage(object): def test_specs(self): # load list of known test suites - with open(self._TEST_SPECS_FILE[ - self.args.iomgr_platform]) as tests_json_file: - tests_json = json.load(tests_json_file) + tests_json = [] + for tests_json_file_name in self._TEST_SPECS_FILE[ + self.args.iomgr_platform]: + with open(tests_json_file_name) as tests_json_file: + tests_json.extend(json.load(tests_json_file)) environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS) # TODO(https://github.com/grpc/grpc/issues/21401) Fork handlers is not # designed for non-native IO manager. It has a side-effect that @@ -773,7 +778,7 @@ class PythonLanguage(object): major='3', config_vars=config_vars) - if args.iomgr_platform == 'asyncio': + if args.iomgr_platform in ('asyncio', 'gevent'): if args.compiler not in ('default', 'python3.6', 'python3.7', 'python3.8'): raise Exception( @@ -789,7 +794,7 @@ class PythonLanguage(object): else: return (python38_config,) else: - if args.iomgr_platform == 'asyncio': + if args.iomgr_platform in ('asyncio', 'gevent'): return (python36_config, python38_config) elif os.uname()[0] == 'Darwin': # NOTE(rbellevi): Testing takes significantly longer on