Merge pull request #20891 from lidizheng/fix-3-i20299

Make sure Core aware of gevent Cython objects
pull/20824/head
Lidi Zheng 5 years ago committed by GitHub
commit 13bc82ce9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
  2. 1
      src/python/grpcio_tests/tests/tests.json
  3. 1
      src/python/grpcio_tests/tests/unit/BUILD.bazel
  4. 54
      src/python/grpcio_tests/tests/unit/_grpc_shutdown_test.py

@ -32,12 +32,16 @@ def _spawn_greenlet(*args):
cdef class SocketWrapper:
def __cinit__(self):
fork_handlers_and_grpc_init()
self.sockopts = []
self.socket = None
self.c_socket = NULL
self.c_buffer = NULL
self.len = 0
def __dealloc__(self):
grpc_shutdown_blocking()
cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil:
sw = SocketWrapper()
sw.c_socket = socket
@ -258,10 +262,14 @@ cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
cdef class ResolveWrapper:
def __cinit__(self):
fork_handlers_and_grpc_init()
self.c_resolver = NULL
self.c_host = NULL
self.c_port = NULL
def __dealloc__(self):
grpc_shutdown_blocking()
cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
try:
res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
@ -298,6 +306,7 @@ cdef grpc_error* socket_resolve(char* host, char* port,
cdef class TimerWrapper:
def __cinit__(self, deadline):
fork_handlers_and_grpc_init()
self.timer = gevent_hub.get_hub().loop.timer(deadline)
self.event = None
@ -314,6 +323,9 @@ cdef class TimerWrapper:
self.event.set()
self.timer.stop()
def __dealloc__(self):
grpc_shutdown_blocking()
cdef void timer_start(grpc_custom_timer* t) with gil:
timer = TimerWrapper(t.timeout_ms / 1000.0)
timer.c_timer = t

@ -50,6 +50,7 @@
"unit._empty_message_test.EmptyMessageTest",
"unit._error_message_encoding_test.ErrorMessageEncodingTest",
"unit._exit_test.ExitTest",
"unit._grpc_shutdown_test.GrpcShutdownTest",
"unit._interceptor_test.InterceptorTest",
"unit._invalid_metadata_test.InvalidMetadataTest",
"unit._invocation_defects_test.InvocationDefectsTest",

@ -18,6 +18,7 @@ GRPCIO_TESTS_UNIT = [
"_empty_message_test.py",
"_error_message_encoding_test.py",
"_exit_test.py",
"_grpc_shutdown_test.py",
"_interceptor_test.py",
"_invalid_metadata_test.py",
"_invocation_defects_test.py",

@ -0,0 +1,54 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests the gRPC Core shutdown path."""
import time
import threading
import unittest
import datetime
import grpc
_TIMEOUT_FOR_SEGFAULT = datetime.timedelta(seconds=10)
class GrpcShutdownTest(unittest.TestCase):
def test_channel_close_with_connectivity_watcher(self):
"""Originated by https://github.com/grpc/grpc/issues/20299.
The grpc_shutdown happens synchronously, but there might be Core object
references left in Cython which might lead to ABORT or SIGSEGV.
"""
connection_failed = threading.Event()
def on_state_change(state):
if state in (grpc.ChannelConnectivity.TRANSIENT_FAILURE,
grpc.ChannelConnectivity.SHUTDOWN):
connection_failed.set()
# Connects to an void address, and subscribes state changes
channel = grpc.insecure_channel("0.1.1.1:12345")
channel.subscribe(on_state_change, True)
deadline = datetime.datetime.now() + _TIMEOUT_FOR_SEGFAULT
while datetime.datetime.now() < deadline:
time.sleep(0.1)
if connection_failed.is_set():
channel.close()
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save