From aaa7f13b171e9c4043ce7701229d5d4adfd8367b Mon Sep 17 00:00:00 2001 From: Isabel Andrade <67916678+beandrad@users.noreply.github.com> Date: Mon, 24 May 2021 23:03:29 +0100 Subject: [PATCH] Handle gevent exception in gevent poller (#26058) * Handle gevent exception in gevent poller Currently the gevent poller ignores exceptions raised by `gevent.wait()`, which causes greenlets to be unkilable while waiting. This change handles exceptions raised while waiting in the gevent poller, cancels the gRPC call and propagates the error back to the application. Co-authored-by: Kostis Lolos * Fix imports in header files * Lint gevent tests * Set grpc event type to GRPC_QUEUE_SHUTDOWN upon cancel error To prevent `grpc_completion_queue_next()` to be called indefinitely when the queue is shut down. * Remove unnecessary `except *` * Improve gevent tests * Format code * Remove unnecessary import Co-authored-by: Kostis Lolos --- src/core/lib/iomgr/pollset_custom.cc | 4 +- src/core/lib/iomgr/pollset_custom.h | 4 +- src/core/lib/iomgr/pollset_uv.cc | 4 +- src/core/lib/iomgr/pollset_uv.h | 6 +- src/core/lib/surface/completion_queue.cc | 6 +- .../_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi | 4 +- .../_cython/_cygrpc/completion_queue.pyx.pxi | 2 +- .../grpcio/grpc/_cython/_cygrpc/grpc.pxi | 4 + .../grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi | 19 +++- .../grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi | 2 +- src/python/grpcio_tests/commands.py | 2 +- .../grpcio_tests/tests_gevent/__init__.py | 13 +++ .../grpcio_tests/tests_gevent/tests.json | 1 + .../tests_gevent/unit/__init__.py | 13 +++ .../tests_gevent/unit/_test_server.py | 58 ++++++++++ .../tests_gevent/unit/close_channel_test.py | 102 ++++++++++++++++++ tools/distrib/pylint_code.sh | 1 + tools/run_tests/run_tests.py | 23 ++-- 18 files changed, 245 insertions(+), 23 deletions(-) create mode 100644 src/python/grpcio_tests/tests_gevent/__init__.py create mode 100644 src/python/grpcio_tests/tests_gevent/tests.json create mode 100644 src/python/grpcio_tests/tests_gevent/unit/__init__.py create mode 100644 src/python/grpcio_tests/tests_gevent/unit/_test_server.py create mode 100644 src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py diff --git a/src/core/lib/iomgr/pollset_custom.cc b/src/core/lib/iomgr/pollset_custom.cc index 62c4b397dd0..1165ea889de 100644 --- a/src/core/lib/iomgr/pollset_custom.cc +++ b/src/core/lib/iomgr/pollset_custom.cc @@ -77,14 +77,14 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset, // control back to the application grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get(); grpc_core::ExecCtx::Set(nullptr); - poller_vtable->poll(static_cast(timeout)); + grpc_error* err = poller_vtable->poll(static_cast(timeout)); grpc_core::ExecCtx::Set(curr); grpc_core::ExecCtx::Get()->InvalidateNow(); if (grpc_core::ExecCtx::Get()->HasWork()) { grpc_core::ExecCtx::Get()->Flush(); } gpr_mu_lock(&pollset->mu); - return GRPC_ERROR_NONE; + return err; } static grpc_error_handle pollset_kick( diff --git a/src/core/lib/iomgr/pollset_custom.h b/src/core/lib/iomgr/pollset_custom.h index 9e2027f7f4a..de3067f48ea 100644 --- a/src/core/lib/iomgr/pollset_custom.h +++ b/src/core/lib/iomgr/pollset_custom.h @@ -21,11 +21,13 @@ #include +#include "src/core/lib/iomgr/error.h" + #include typedef struct grpc_custom_poller_vtable { void (*init)(); - void (*poll)(size_t timeout_ms); + grpc_error* (*poll)(size_t timeout_ms); void (*kick)(); void (*shutdown)(); } grpc_custom_poller_vtable; diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index bade6eae6c5..ef76870723f 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -24,6 +24,7 @@ #include #include +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/pollset_custom.h" #include @@ -54,7 +55,7 @@ static void empty_timer_cb(uv_timer_t* handle) {} static void kick_timer_cb(uv_timer_t* handle) { g_kicked = false; } -static void run_loop(size_t timeout) { +static grpc_error* run_loop(size_t timeout) { if (grpc_pollset_work_run_loop) { if (timeout == 0) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -64,6 +65,7 @@ static void run_loop(size_t timeout) { uv_timer_stop(&g_handle->poll_timer); } } + return GRPC_ERROR_NONE; } static void kick() { diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index de82bcc1d31..84fdefcc169 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -19,11 +19,15 @@ #ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H #define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H +#include + +#include "src/core/lib/iomgr/error.h" + extern int grpc_pollset_work_run_loop; typedef struct grpc_custom_poller_vtable { void (*init)(void); - void (*run_loop)(int blocking); + grpc_error* (*run_loop)(int blocking); } grpc_custom_poller_vtable; void grpc_custom_pollset_global_init(grpc_custom_poller_vtable* vtable); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 2b889e0de5c..cdb7f80dc60 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -1065,7 +1065,11 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, gpr_log(GPR_ERROR, "Completion queue next failed: %s", grpc_error_std_string(err).c_str()); GRPC_ERROR_UNREF(err); - ret.type = GRPC_QUEUE_TIMEOUT; + if (err == GRPC_ERROR_CANCELLED) { + ret.type = GRPC_QUEUE_SHUTDOWN; + } else { + ret.type = GRPC_QUEUE_TIMEOUT; + } ret.success = 0; dump_pending_tags(cq); break; diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 86171def061..09232767544 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -208,8 +208,8 @@ cdef void asyncio_kick_loop() with gil: pass -cdef void asyncio_run_loop(size_t timeout_ms) with gil: - pass +cdef grpc_error* asyncio_run_loop(size_t timeout_ms) with gil: + return grpc_error_none() def _auth_plugin_callback_wrapper(object cb, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 54c55f85f12..cb5fad9ebf6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -31,7 +31,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) if gpr_time_cmp(c_timeout, c_deadline) > 0: c_timeout = c_deadline - + c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) if (c_event.type != GRPC_QUEUE_TIMEOUT or diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index fb8ceae6a5d..2ef4eb26834 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -699,3 +699,7 @@ cdef extern from "grpc/grpc_security_constants.h": ctypedef enum grpc_local_connect_type: UDS LOCAL_TCP + + +cdef extern from "src/core/lib/iomgr/error.h": + const grpc_error* GRPC_ERROR_CANCELLED diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi index c86ff2f1ff1..22b476628ad 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -15,6 +15,7 @@ from libc cimport string import errno +import sys gevent_g = None gevent_socket = None gevent_hub = None @@ -348,12 +349,24 @@ cdef void destroy_loop() with gil: cdef void kick_loop() with gil: g_event.set() -cdef void run_loop(size_t timeout_ms) with gil: - timeout = timeout_ms / 1000.0 - if timeout_ms > 0: +def _run_loop(timeout_ms): + timeout = timeout_ms / 1000.0 + if timeout_ms > 0: + try: g_event.wait(timeout) + finally: g_event.clear() +cdef grpc_error* run_loop(size_t timeout_ms) with gil: + try: + _run_loop(timeout_ms) + return grpc_error_none() + except BaseException: + exc_info = sys.exc_info() + # Avoid running any Python code after setting the exception + cpython.PyErr_SetObject(exc_info[0], exc_info[1]) + return GRPC_ERROR_CANCELLED + ############################### ### Initializer ############### ############################### diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi index 9cf3113bbf7..b427554293c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi @@ -110,7 +110,7 @@ cdef extern from "src/core/lib/iomgr/timer_custom.h": cdef extern from "src/core/lib/iomgr/pollset_custom.h": struct grpc_custom_poller_vtable: void (*init)() - void (*poll)(size_t timeout_ms) + grpc_error* (*poll)(size_t timeout_ms) void (*kick)() void (*shutdown)() diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index f5f3849ad2c..ab7dbc5caa0 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -257,7 +257,7 @@ class TestGevent(setuptools.Command): import tests loader = tests.Loader() - loader.loadTestsFromNames(['tests']) + loader.loadTestsFromNames(['tests', 'tests_gevent']) runner = tests.Runner() if sys.platform == 'win32': runner.skip_tests(self.BANNED_TESTS + self.BANNED_WINDOWS_TESTS) diff --git a/src/python/grpcio_tests/tests_gevent/__init__.py b/src/python/grpcio_tests/tests_gevent/__init__.py new file mode 100644 index 00000000000..712a2e1de2a --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_gevent/tests.json b/src/python/grpcio_tests/tests_gevent/tests.json new file mode 100644 index 00000000000..11307dcb116 --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/tests.json @@ -0,0 +1 @@ +["unit.close_channel_test.CloseChannelTest"] diff --git a/src/python/grpcio_tests/tests_gevent/unit/__init__.py b/src/python/grpcio_tests/tests_gevent/unit/__init__.py new file mode 100644 index 00000000000..712a2e1de2a --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_gevent/unit/_test_server.py b/src/python/grpcio_tests/tests_gevent/unit/_test_server.py new file mode 100644 index 00000000000..f1ba23e093f --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/unit/_test_server.py @@ -0,0 +1,58 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc +import grpc +import gevent +from typing import Any, Tuple + +LONG_UNARY_CALL_WITH_SLEEP_VALUE = 1 + + +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + + def UnaryCall(self, request, context): + return messages_pb2.SimpleResponse() + + def UnaryCallWithSleep(self, unused_request, unused_context): + gevent.sleep(LONG_UNARY_CALL_WITH_SLEEP_VALUE) + return messages_pb2.SimpleResponse() + + +def start_test_server(port: int = 0) -> Tuple[str, Any]: + server = grpc.server(futures.ThreadPoolExecutor()) + servicer = TestServiceServicer() + test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), + server) + + server.add_generic_rpc_handlers((_create_extra_generic_handler(servicer),)) + port = server.add_insecure_port('[::]:%d' % port) + server.start() + return 'localhost:%d' % port, server + + +def _create_extra_generic_handler(servicer: TestServiceServicer) -> Any: + # Add programatically extra methods not provided by the proto file + # that are used during the tests + rpc_method_handlers = { + 'UnaryCallWithSleep': + grpc.unary_unary_rpc_method_handler( + servicer.UnaryCallWithSleep, + request_deserializer=messages_pb2.SimpleRequest.FromString, + response_serializer=messages_pb2.SimpleResponse. + SerializeToString) + } + return grpc.method_handlers_generic_handler('grpc.testing.TestService', + rpc_method_handlers) diff --git a/src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py b/src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py new file mode 100644 index 00000000000..54f13611dab --- /dev/null +++ b/src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py @@ -0,0 +1,102 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc +import grpc +import gevent +import sys +from gevent.pool import Group +from tests_gevent.unit._test_server import start_test_server + +_UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep' + + +class CloseChannelTest(unittest.TestCase): + + def setUp(self): + self._server_target, self._server = start_test_server() + self._channel = grpc.insecure_channel(self._server_target) + self._unhandled_exception = False + sys.excepthook = self._global_exception_handler + + def tearDown(self): + self._channel.close() + self._server.stop(None) + + def test_graceful_close(self): + stub = test_pb2_grpc.TestServiceStub(self._channel) + _, response = stub.UnaryCall.with_call(messages_pb2.SimpleRequest()) + + self._channel.close() + + self.assertEqual(grpc.StatusCode.OK, response.code()) + + def test_graceful_close_in_greenlet(self): + group = Group() + stub = test_pb2_grpc.TestServiceStub(self._channel) + greenlet = group.spawn(self._run_client, stub.UnaryCall) + # release loop so that greenlet can take control + gevent.sleep() + self._channel.close() + group.killone(greenlet) + self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit") + try: + greenlet.get() + except Exception as e: # pylint: disable=broad-except + self.fail(f"Unexpected exception in greenlet: {e}") + + def test_ungraceful_close_in_greenlet(self): + group = Group() + UnaryCallWithSleep = self._channel.unary_unary( + _UNARY_CALL_METHOD_WITH_SLEEP, + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString, + ) + greenlet = group.spawn(self._run_client, UnaryCallWithSleep) + # release loop so that greenlet can take control + gevent.sleep() + group.killone(greenlet) + self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit") + + def test_kill_greenlet_with_generic_exception(self): + group = Group() + UnaryCallWithSleep = self._channel.unary_unary( + _UNARY_CALL_METHOD_WITH_SLEEP, + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString, + ) + greenlet = group.spawn(self._run_client, UnaryCallWithSleep) + # release loop so that greenlet can take control + gevent.sleep() + group.killone(greenlet, exception=Exception) + self.assertFalse(self._unhandled_exception, "Unhandled exception") + self.assertRaises(Exception, greenlet.get) + + def _run_client(self, call): + try: + call.with_call(messages_pb2.SimpleRequest()) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.CANCELLED: + raise + + def _global_exception_handler(self, exctype, value, tb): + if exctype == gevent.GreenletExit: + self._unhandled_exception = True + return + sys.__excepthook__(exctype, value, tb) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tools/distrib/pylint_code.sh b/tools/distrib/pylint_code.sh index 060c2b7ffd9..83faa932ba9 100755 --- a/tools/distrib/pylint_code.sh +++ b/tools/distrib/pylint_code.sh @@ -32,6 +32,7 @@ DIRS=( TEST_DIRS=( 'src/python/grpcio_tests/tests' + 'src/python/grpcio_tests/tests_gevent' ) VIRTUALENV=python_pylint_venv diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 5176331c3fb..afb96226dbb 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -622,13 +622,16 @@ class PythonConfig( class PythonLanguage(object): _TEST_SPECS_FILE = { - 'native': 'src/python/grpcio_tests/tests/tests.json', - 'gevent': 'src/python/grpcio_tests/tests/tests.json', - 'asyncio': 'src/python/grpcio_tests/tests_aio/tests.json', + 'native': ['src/python/grpcio_tests/tests/tests.json'], + 'gevent': [ + 'src/python/grpcio_tests/tests/tests.json', + 'src/python/grpcio_tests/tests_gevent/tests.json', + ], + 'asyncio': ['src/python/grpcio_tests/tests_aio/tests.json'], } _TEST_FOLDER = { 'native': 'test', - 'gevent': 'test', + 'gevent': 'test_gevent', 'asyncio': 'test_aio', } @@ -639,9 +642,11 @@ class PythonLanguage(object): def test_specs(self): # load list of known test suites - with open(self._TEST_SPECS_FILE[ - self.args.iomgr_platform]) as tests_json_file: - tests_json = json.load(tests_json_file) + tests_json = [] + for tests_json_file_name in self._TEST_SPECS_FILE[ + self.args.iomgr_platform]: + with open(tests_json_file_name) as tests_json_file: + tests_json.extend(json.load(tests_json_file)) environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS) # TODO(https://github.com/grpc/grpc/issues/21401) Fork handlers is not # designed for non-native IO manager. It has a side-effect that @@ -773,7 +778,7 @@ class PythonLanguage(object): major='3', config_vars=config_vars) - if args.iomgr_platform == 'asyncio': + if args.iomgr_platform in ('asyncio', 'gevent'): if args.compiler not in ('default', 'python3.6', 'python3.7', 'python3.8'): raise Exception( @@ -789,7 +794,7 @@ class PythonLanguage(object): else: return (python38_config,) else: - if args.iomgr_platform == 'asyncio': + if args.iomgr_platform in ('asyncio', 'gevent'): return (python36_config, python38_config) elif os.uname()[0] == 'Darwin': # NOTE(rbellevi): Testing takes significantly longer on