diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi index 13256ed49b8..1f59cfd159d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -32,12 +32,16 @@ def _spawn_greenlet(*args): cdef class SocketWrapper: def __cinit__(self): + fork_handlers_and_grpc_init() self.sockopts = [] self.socket = None self.c_socket = NULL self.c_buffer = NULL self.len = 0 + def __dealloc__(self): + grpc_shutdown_blocking() + cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: sw = SocketWrapper() sw.c_socket = socket @@ -258,10 +262,14 @@ cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, cdef class ResolveWrapper: def __cinit__(self): + fork_handlers_and_grpc_init() self.c_resolver = NULL self.c_host = NULL self.c_port = NULL + def __dealloc__(self): + grpc_shutdown_blocking() + cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): try: res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) @@ -298,6 +306,7 @@ cdef grpc_error* socket_resolve(char* host, char* port, cdef class TimerWrapper: def __cinit__(self, deadline): + fork_handlers_and_grpc_init() self.timer = gevent_hub.get_hub().loop.timer(deadline) self.event = None @@ -314,6 +323,9 @@ cdef class TimerWrapper: self.event.set() self.timer.stop() + def __dealloc__(self): + grpc_shutdown_blocking() + cdef void timer_start(grpc_custom_timer* t) with gil: timer = TimerWrapper(t.timeout_ms / 1000.0) timer.c_timer = t diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index c636ee5d4e3..eb702f6c8b0 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -50,6 +50,7 @@ "unit._empty_message_test.EmptyMessageTest", "unit._error_message_encoding_test.ErrorMessageEncodingTest", "unit._exit_test.ExitTest", + "unit._grpc_shutdown_test.GrpcShutdownTest", "unit._interceptor_test.InterceptorTest", "unit._invalid_metadata_test.InvalidMetadataTest", "unit._invocation_defects_test.InvocationDefectsTest", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 5fb7c1f74d0..42da77dbad2 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -18,6 +18,7 @@ GRPCIO_TESTS_UNIT = [ "_empty_message_test.py", "_error_message_encoding_test.py", "_exit_test.py", + "_grpc_shutdown_test.py", "_interceptor_test.py", "_invalid_metadata_test.py", "_invocation_defects_test.py", diff --git a/src/python/grpcio_tests/tests/unit/_grpc_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_grpc_shutdown_test.py new file mode 100644 index 00000000000..1c4890b97f1 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_grpc_shutdown_test.py @@ -0,0 +1,54 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests the gRPC Core shutdown path.""" + +import time +import threading +import unittest +import datetime + +import grpc + +_TIMEOUT_FOR_SEGFAULT = datetime.timedelta(seconds=10) + + +class GrpcShutdownTest(unittest.TestCase): + + def test_channel_close_with_connectivity_watcher(self): + """Originated by https://github.com/grpc/grpc/issues/20299. + + The grpc_shutdown happens synchronously, but there might be Core object + references left in Cython which might lead to ABORT or SIGSEGV. + """ + connection_failed = threading.Event() + + def on_state_change(state): + if state in (grpc.ChannelConnectivity.TRANSIENT_FAILURE, + grpc.ChannelConnectivity.SHUTDOWN): + connection_failed.set() + + # Connects to an void address, and subscribes state changes + channel = grpc.insecure_channel("0.1.1.1:12345") + channel.subscribe(on_state_change, True) + + deadline = datetime.datetime.now() + _TIMEOUT_FOR_SEGFAULT + + while datetime.datetime.now() < deadline: + time.sleep(0.1) + if connection_failed.is_set(): + channel.close() + + +if __name__ == '__main__': + unittest.main(verbosity=2)