Handle gevent exception in gevent poller (#26058)

* Handle gevent exception in gevent poller

Currently the gevent poller ignores exceptions raised by
`gevent.wait()`, which causes greenlets to be unkilable while waiting.

This change handles exceptions raised while waiting in the gevent
poller, cancels the gRPC call and propagates the error back to the
application.

Co-authored-by: Kostis Lolos <klolos@arrikto.com>

* Fix imports in header files

* Lint gevent tests

* Set grpc event type to GRPC_QUEUE_SHUTDOWN upon cancel error

To prevent `grpc_completion_queue_next()` to be called indefinitely when
the queue is shut down.

* Remove unnecessary `except *`

* Improve gevent tests

* Format code

* Remove unnecessary import

Co-authored-by: Kostis Lolos <klolos@arrikto.com>
reviewable/pr25586/r5^2
Isabel Andrade 4 years ago committed by GitHub
parent c97da0fc25
commit aaa7f13b17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/lib/iomgr/pollset_custom.cc
  2. 4
      src/core/lib/iomgr/pollset_custom.h
  3. 4
      src/core/lib/iomgr/pollset_uv.cc
  4. 6
      src/core/lib/iomgr/pollset_uv.h
  5. 6
      src/core/lib/surface/completion_queue.cc
  6. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  7. 2
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  8. 4
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  9. 19
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
  10. 2
      src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi
  11. 2
      src/python/grpcio_tests/commands.py
  12. 13
      src/python/grpcio_tests/tests_gevent/__init__.py
  13. 1
      src/python/grpcio_tests/tests_gevent/tests.json
  14. 13
      src/python/grpcio_tests/tests_gevent/unit/__init__.py
  15. 58
      src/python/grpcio_tests/tests_gevent/unit/_test_server.py
  16. 102
      src/python/grpcio_tests/tests_gevent/unit/close_channel_test.py
  17. 1
      tools/distrib/pylint_code.sh
  18. 23
      tools/run_tests/run_tests.py

@ -77,14 +77,14 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset,
// control back to the application
grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get();
grpc_core::ExecCtx::Set(nullptr);
poller_vtable->poll(static_cast<size_t>(timeout));
grpc_error* err = poller_vtable->poll(static_cast<size_t>(timeout));
grpc_core::ExecCtx::Set(curr);
grpc_core::ExecCtx::Get()->InvalidateNow();
if (grpc_core::ExecCtx::Get()->HasWork()) {
grpc_core::ExecCtx::Get()->Flush();
}
gpr_mu_lock(&pollset->mu);
return GRPC_ERROR_NONE;
return err;
}
static grpc_error_handle pollset_kick(

@ -21,11 +21,13 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/error.h"
#include <stddef.h>
typedef struct grpc_custom_poller_vtable {
void (*init)();
void (*poll)(size_t timeout_ms);
grpc_error* (*poll)(size_t timeout_ms);
void (*kick)();
void (*shutdown)();
} grpc_custom_poller_vtable;

@ -24,6 +24,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_custom.h"
#include <uv.h>
@ -54,7 +55,7 @@ static void empty_timer_cb(uv_timer_t* handle) {}
static void kick_timer_cb(uv_timer_t* handle) { g_kicked = false; }
static void run_loop(size_t timeout) {
static grpc_error* run_loop(size_t timeout) {
if (grpc_pollset_work_run_loop) {
if (timeout == 0) {
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
@ -64,6 +65,7 @@ static void run_loop(size_t timeout) {
uv_timer_stop(&g_handle->poll_timer);
}
}
return GRPC_ERROR_NONE;
}
static void kick() {

@ -19,11 +19,15 @@
#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H
#define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/error.h"
extern int grpc_pollset_work_run_loop;
typedef struct grpc_custom_poller_vtable {
void (*init)(void);
void (*run_loop)(int blocking);
grpc_error* (*run_loop)(int blocking);
} grpc_custom_poller_vtable;
void grpc_custom_pollset_global_init(grpc_custom_poller_vtable* vtable);

@ -1065,7 +1065,11 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
gpr_log(GPR_ERROR, "Completion queue next failed: %s",
grpc_error_std_string(err).c_str());
GRPC_ERROR_UNREF(err);
ret.type = GRPC_QUEUE_TIMEOUT;
if (err == GRPC_ERROR_CANCELLED) {
ret.type = GRPC_QUEUE_SHUTDOWN;
} else {
ret.type = GRPC_QUEUE_TIMEOUT;
}
ret.success = 0;
dump_pending_tags(cq);
break;

@ -208,8 +208,8 @@ cdef void asyncio_kick_loop() with gil:
pass
cdef void asyncio_run_loop(size_t timeout_ms) with gil:
pass
cdef grpc_error* asyncio_run_loop(size_t timeout_ms) with gil:
return grpc_error_none()
def _auth_plugin_callback_wrapper(object cb,

@ -31,7 +31,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep
c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
if gpr_time_cmp(c_timeout, c_deadline) > 0:
c_timeout = c_deadline
c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
if (c_event.type != GRPC_QUEUE_TIMEOUT or

@ -699,3 +699,7 @@ cdef extern from "grpc/grpc_security_constants.h":
ctypedef enum grpc_local_connect_type:
UDS
LOCAL_TCP
cdef extern from "src/core/lib/iomgr/error.h":
const grpc_error* GRPC_ERROR_CANCELLED

@ -15,6 +15,7 @@
from libc cimport string
import errno
import sys
gevent_g = None
gevent_socket = None
gevent_hub = None
@ -348,12 +349,24 @@ cdef void destroy_loop() with gil:
cdef void kick_loop() with gil:
g_event.set()
cdef void run_loop(size_t timeout_ms) with gil:
timeout = timeout_ms / 1000.0
if timeout_ms > 0:
def _run_loop(timeout_ms):
timeout = timeout_ms / 1000.0
if timeout_ms > 0:
try:
g_event.wait(timeout)
finally:
g_event.clear()
cdef grpc_error* run_loop(size_t timeout_ms) with gil:
try:
_run_loop(timeout_ms)
return grpc_error_none()
except BaseException:
exc_info = sys.exc_info()
# Avoid running any Python code after setting the exception
cpython.PyErr_SetObject(exc_info[0], exc_info[1])
return GRPC_ERROR_CANCELLED
###############################
### Initializer ###############
###############################

@ -110,7 +110,7 @@ cdef extern from "src/core/lib/iomgr/timer_custom.h":
cdef extern from "src/core/lib/iomgr/pollset_custom.h":
struct grpc_custom_poller_vtable:
void (*init)()
void (*poll)(size_t timeout_ms)
grpc_error* (*poll)(size_t timeout_ms)
void (*kick)()
void (*shutdown)()

@ -257,7 +257,7 @@ class TestGevent(setuptools.Command):
import tests
loader = tests.Loader()
loader.loadTestsFromNames(['tests'])
loader.loadTestsFromNames(['tests', 'tests_gevent'])
runner = tests.Runner()
if sys.platform == 'win32':
runner.skip_tests(self.BANNED_TESTS + self.BANNED_WINDOWS_TESTS)

@ -0,0 +1,13 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1 @@
["unit.close_channel_test.CloseChannelTest"]

@ -0,0 +1,13 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,58 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
from src.proto.grpc.testing import messages_pb2, test_pb2_grpc
import grpc
import gevent
from typing import Any, Tuple
LONG_UNARY_CALL_WITH_SLEEP_VALUE = 1
class TestServiceServicer(test_pb2_grpc.TestServiceServicer):
def UnaryCall(self, request, context):
return messages_pb2.SimpleResponse()
def UnaryCallWithSleep(self, unused_request, unused_context):
gevent.sleep(LONG_UNARY_CALL_WITH_SLEEP_VALUE)
return messages_pb2.SimpleResponse()
def start_test_server(port: int = 0) -> Tuple[str, Any]:
server = grpc.server(futures.ThreadPoolExecutor())
servicer = TestServiceServicer()
test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(),
server)
server.add_generic_rpc_handlers((_create_extra_generic_handler(servicer),))
port = server.add_insecure_port('[::]:%d' % port)
server.start()
return 'localhost:%d' % port, server
def _create_extra_generic_handler(servicer: TestServiceServicer) -> Any:
# Add programatically extra methods not provided by the proto file
# that are used during the tests
rpc_method_handlers = {
'UnaryCallWithSleep':
grpc.unary_unary_rpc_method_handler(
servicer.UnaryCallWithSleep,
request_deserializer=messages_pb2.SimpleRequest.FromString,
response_serializer=messages_pb2.SimpleResponse.
SerializeToString)
}
return grpc.method_handlers_generic_handler('grpc.testing.TestService',
rpc_method_handlers)

@ -0,0 +1,102 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from src.proto.grpc.testing import messages_pb2, test_pb2_grpc
import grpc
import gevent
import sys
from gevent.pool import Group
from tests_gevent.unit._test_server import start_test_server
_UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep'
class CloseChannelTest(unittest.TestCase):
def setUp(self):
self._server_target, self._server = start_test_server()
self._channel = grpc.insecure_channel(self._server_target)
self._unhandled_exception = False
sys.excepthook = self._global_exception_handler
def tearDown(self):
self._channel.close()
self._server.stop(None)
def test_graceful_close(self):
stub = test_pb2_grpc.TestServiceStub(self._channel)
_, response = stub.UnaryCall.with_call(messages_pb2.SimpleRequest())
self._channel.close()
self.assertEqual(grpc.StatusCode.OK, response.code())
def test_graceful_close_in_greenlet(self):
group = Group()
stub = test_pb2_grpc.TestServiceStub(self._channel)
greenlet = group.spawn(self._run_client, stub.UnaryCall)
# release loop so that greenlet can take control
gevent.sleep()
self._channel.close()
group.killone(greenlet)
self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit")
try:
greenlet.get()
except Exception as e: # pylint: disable=broad-except
self.fail(f"Unexpected exception in greenlet: {e}")
def test_ungraceful_close_in_greenlet(self):
group = Group()
UnaryCallWithSleep = self._channel.unary_unary(
_UNARY_CALL_METHOD_WITH_SLEEP,
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString,
)
greenlet = group.spawn(self._run_client, UnaryCallWithSleep)
# release loop so that greenlet can take control
gevent.sleep()
group.killone(greenlet)
self.assertFalse(self._unhandled_exception, "Unhandled GreenletExit")
def test_kill_greenlet_with_generic_exception(self):
group = Group()
UnaryCallWithSleep = self._channel.unary_unary(
_UNARY_CALL_METHOD_WITH_SLEEP,
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString,
)
greenlet = group.spawn(self._run_client, UnaryCallWithSleep)
# release loop so that greenlet can take control
gevent.sleep()
group.killone(greenlet, exception=Exception)
self.assertFalse(self._unhandled_exception, "Unhandled exception")
self.assertRaises(Exception, greenlet.get)
def _run_client(self, call):
try:
call.with_call(messages_pb2.SimpleRequest())
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED:
raise
def _global_exception_handler(self, exctype, value, tb):
if exctype == gevent.GreenletExit:
self._unhandled_exception = True
return
sys.__excepthook__(exctype, value, tb)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -32,6 +32,7 @@ DIRS=(
TEST_DIRS=(
'src/python/grpcio_tests/tests'
'src/python/grpcio_tests/tests_gevent'
)
VIRTUALENV=python_pylint_venv

@ -622,13 +622,16 @@ class PythonConfig(
class PythonLanguage(object):
_TEST_SPECS_FILE = {
'native': 'src/python/grpcio_tests/tests/tests.json',
'gevent': 'src/python/grpcio_tests/tests/tests.json',
'asyncio': 'src/python/grpcio_tests/tests_aio/tests.json',
'native': ['src/python/grpcio_tests/tests/tests.json'],
'gevent': [
'src/python/grpcio_tests/tests/tests.json',
'src/python/grpcio_tests/tests_gevent/tests.json',
],
'asyncio': ['src/python/grpcio_tests/tests_aio/tests.json'],
}
_TEST_FOLDER = {
'native': 'test',
'gevent': 'test',
'gevent': 'test_gevent',
'asyncio': 'test_aio',
}
@ -639,9 +642,11 @@ class PythonLanguage(object):
def test_specs(self):
# load list of known test suites
with open(self._TEST_SPECS_FILE[
self.args.iomgr_platform]) as tests_json_file:
tests_json = json.load(tests_json_file)
tests_json = []
for tests_json_file_name in self._TEST_SPECS_FILE[
self.args.iomgr_platform]:
with open(tests_json_file_name) as tests_json_file:
tests_json.extend(json.load(tests_json_file))
environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS)
# TODO(https://github.com/grpc/grpc/issues/21401) Fork handlers is not
# designed for non-native IO manager. It has a side-effect that
@ -773,7 +778,7 @@ class PythonLanguage(object):
major='3',
config_vars=config_vars)
if args.iomgr_platform == 'asyncio':
if args.iomgr_platform in ('asyncio', 'gevent'):
if args.compiler not in ('default', 'python3.6', 'python3.7',
'python3.8'):
raise Exception(
@ -789,7 +794,7 @@ class PythonLanguage(object):
else:
return (python38_config,)
else:
if args.iomgr_platform == 'asyncio':
if args.iomgr_platform in ('asyncio', 'gevent'):
return (python36_config, python38_config)
elif os.uname()[0] == 'Darwin':
# NOTE(rbellevi): Testing takes significantly longer on

Loading…
Cancel
Save