From 2ea6d3ef0b1b8fe6085c34de68ec10eb5bbf1d32 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 29 Aug 2018 22:20:21 -0700 Subject: [PATCH 1/2] Add fork tests as Python unit tests --- src/python/grpcio/grpc/_channel.py | 3 +- .../grpc/_cython/_cygrpc/fork_posix.pyx.pxi | 44 +++-- src/python/grpcio_tests/commands.py | 2 + .../tests/fork/_fork_interop_test.py | 152 ++++++++++++++++++ src/python/grpcio_tests/tests/fork/client.py | 6 +- src/python/grpcio_tests/tests/fork/methods.py | 98 ++++++----- src/python/grpcio_tests/tests/tests.json | 1 + 7 files changed, 241 insertions(+), 65 deletions(-) create mode 100644 src/python/grpcio_tests/tests/fork/_fork_interop_test.py diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index df06ffaeb3b..1d2495cdd21 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -1033,6 +1033,7 @@ class Channel(grpc.Channel): def _close(self): self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') + cygrpc.fork_unregister_channel(self) _moot(self._connectivity_state) def _close_on_fork(self): @@ -1060,8 +1061,6 @@ class Channel(grpc.Channel): # for as long as they are in use and to close them after using them, # then deletion of this grpc._channel.Channel instance can be made to # effect closure of the underlying cygrpc.Channel instance. - if cygrpc is not None: # Globals may have already been collected. - cygrpc.fork_unregister_channel(self) # This prevent the failed-at-initializing object removal from failing. # Though the __init__ failed, the removal will still trigger __del__. if _moot is not None and hasattr(self, '_connectivity_state'): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi index 433ae1f374f..6dbd6a985e3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import threading @@ -37,8 +36,12 @@ _GRPC_ENABLE_FORK_SUPPORT = ( os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0') .lower() in _TRUE_VALUES) +_fork_handler_failed = False + cdef void __prefork() nogil: with gil: + global _fork_handler_failed + _fork_handler_failed = False with _fork_state.fork_in_progress_condition: _fork_state.fork_in_progress = True if not _fork_state.active_thread_count.await_zero_threads( @@ -46,6 +49,7 @@ cdef void __prefork() nogil: _LOGGER.error( 'Failed to shutdown gRPC Python threads prior to fork. ' 'Behavior after fork will be undefined.') + _fork_handler_failed = True cdef void __postfork_parent() nogil: @@ -57,20 +61,28 @@ cdef void __postfork_parent() nogil: cdef void __postfork_child() nogil: with gil: - # Thread could be holding the fork_in_progress_condition inside of - # block_if_fork_in_progress() when fork occurs. Reset the lock here. - _fork_state.fork_in_progress_condition = threading.Condition() - # A thread in return_from_user_request_generator() may hold this lock - # when fork occurs. - _fork_state.active_thread_count = _ActiveThreadCount() - for state_to_reset in _fork_state.postfork_states_to_reset: - state_to_reset.reset_postfork_child() - _fork_state.fork_epoch += 1 - for channel in _fork_state.channels: - channel._close_on_fork() - # TODO(ericgribkoff) Check and abort if core is not shutdown - with _fork_state.fork_in_progress_condition: - _fork_state.fork_in_progress = False + try: + if _fork_handler_failed: + return + # Thread could be holding the fork_in_progress_condition inside of + # block_if_fork_in_progress() when fork occurs. Reset the lock here. + _fork_state.fork_in_progress_condition = threading.Condition() + # A thread in return_from_user_request_generator() may hold this lock + # when fork occurs. + _fork_state.active_thread_count = _ActiveThreadCount() + for state_to_reset in _fork_state.postfork_states_to_reset: + state_to_reset.reset_postfork_child() + _fork_state.postfork_states_to_reset = [] + _fork_state.fork_epoch += 1 + for channel in _fork_state.channels: + channel._close_on_fork() + with _fork_state.fork_in_progress_condition: + _fork_state.fork_in_progress = False + except: + _LOGGER.error('Exiting child due to raised exception') + _LOGGER.error(sys.exc_info()[0]) + os._exit(os.EX_USAGE) + if grpc_is_initialized() > 0: with gil: _LOGGER.error('Failed to shutdown gRPC Core after fork()') @@ -148,7 +160,7 @@ def fork_register_channel(channel): def fork_unregister_channel(channel): if _GRPC_ENABLE_FORK_SUPPORT: - _fork_state.channels.remove(channel) + _fork_state.channels.discard(channel) class _ActiveThreadCount(object): diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 582ce898dee..866fb6de1f7 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -111,6 +111,8 @@ class TestGevent(setuptools.Command): """Command to run tests w/gevent.""" BANNED_TESTS = ( + # Fork support is not compatible with gevent + 'fork._fork_interop_test.ForkInteropTest', # These tests send a lot of RPCs and are really slow on gevent. They will # eventually succeed, but need to dig into performance issues. 'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs', diff --git a/src/python/grpcio_tests/tests/fork/_fork_interop_test.py b/src/python/grpcio_tests/tests/fork/_fork_interop_test.py new file mode 100644 index 00000000000..bbcfb7446a9 --- /dev/null +++ b/src/python/grpcio_tests/tests/fork/_fork_interop_test.py @@ -0,0 +1,152 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Client-side fork interop tests as a unit test.""" + +import six +import subprocess +import sys +import threading +import unittest +from grpc._cython import cygrpc +from tests.fork import methods + +# New instance of multiprocessing.Process using fork without exec can and will +# hang if the Python process has any other threads running. This includes the +# additional thread spawned by our _runner.py class. So in order to test our +# compatibility with multiprocessing, we first fork+exec a new process to ensure +# we don't have any conflicting background threads. +_CLIENT_FORK_SCRIPT_TEMPLATE = """if True: + import os + import sys + from grpc._cython import cygrpc + from tests.fork import methods + + cygrpc._GRPC_ENABLE_FORK_SUPPORT = True + os.environ['GRPC_POLL_STRATEGY'] = 'epoll1' + methods.TestCase.%s.run_test({ + 'server_host': 'localhost', + 'server_port': %d, + 'use_tls': False + }) +""" +_SUBPROCESS_TIMEOUT_S = 30 + + +@unittest.skipUnless( + sys.platform.startswith("linux"), + "not supported on windows, and fork+exec networking blocked on mac") +@unittest.skipUnless(six.PY2, "https://github.com/grpc/grpc/issues/18075") +class ForkInteropTest(unittest.TestCase): + + def setUp(self): + start_server_script = """if True: + import sys + import time + + import grpc + from src.proto.grpc.testing import test_pb2_grpc + from tests.interop import methods as interop_methods + from tests.unit import test_common + + server = test_common.test_server() + test_pb2_grpc.add_TestServiceServicer_to_server( + interop_methods.TestService(), server) + port = server.add_insecure_port('[::]:0') + server.start() + print(port) + sys.stdout.flush() + while True: + time.sleep(1) + """ + self._server_process = subprocess.Popen( + [sys.executable, '-c', start_server_script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, + self._server_process.kill) + try: + timer.start() + self._port = int(self._server_process.stdout.readline()) + except ValueError: + raise Exception('Failed to get port from server') + finally: + timer.cancel() + + def testConnectivityWatch(self): + self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH) + + def testCloseChannelBeforeFork(self): + self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK) + + def testAsyncUnarySameChannel(self): + self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL) + + def testAsyncUnaryNewChannel(self): + self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL) + + def testBlockingUnarySameChannel(self): + self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL) + + def testBlockingUnaryNewChannel(self): + self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL) + + def testInProgressBidiContinueCall(self): + self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL) + + def testInProgressBidiSameChannelAsyncCall(self): + self._verifyTestCase( + methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL) + + def testInProgressBidiSameChannelBlockingCall(self): + self._verifyTestCase( + methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL) + + def testInProgressBidiNewChannelAsyncCall(self): + self._verifyTestCase( + methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL) + + def testInProgressBidiNewChannelBlockingCall(self): + self._verifyTestCase( + methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL) + + def tearDown(self): + self._server_process.kill() + + def _verifyTestCase(self, test_case): + script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port) + process = subprocess.Popen( + [sys.executable, '-c', script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill) + try: + timer.start() + try: + out, err = process.communicate(timeout=_SUBPROCESS_TIMEOUT_S) + except TypeError: + # The timeout parameter was added in Python 3.3. + out, err = process.communicate() + except subprocess.TimeoutExpired: + process.kill() + raise ValueError('Process failed to terminate') + finally: + timer.cancel() + self.assertEqual( + 0, process.returncode, + 'process failed with exit code %d (stdout: %s, stderr: %s)' % + (process.returncode, out, err)) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/fork/client.py b/src/python/grpcio_tests/tests/fork/client.py index 9a32629ed5a..94a20b33714 100644 --- a/src/python/grpcio_tests/tests/fork/client.py +++ b/src/python/grpcio_tests/tests/fork/client.py @@ -63,12 +63,12 @@ def _test_case_from_arg(test_case_arg): def test_fork(): logging.basicConfig(level=logging.INFO) - args = _args() - if args.test_case == "all": + args = vars(_args()) + if args['test_case'] == "all": for test_case in methods.TestCase: test_case.run_test(args) else: - test_case = _test_case_from_arg(args.test_case) + test_case = _test_case_from_arg(args['test_case']) test_case.run_test(args) diff --git a/src/python/grpcio_tests/tests/fork/methods.py b/src/python/grpcio_tests/tests/fork/methods.py index 889ef13cb28..9481d004cd9 100644 --- a/src/python/grpcio_tests/tests/fork/methods.py +++ b/src/python/grpcio_tests/tests/fork/methods.py @@ -30,11 +30,13 @@ from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc _LOGGER = logging.getLogger(__name__) +_RPC_TIMEOUT_S = 10 +_CHILD_FINISH_TIMEOUT_S = 60 def _channel(args): - target = '{}:{}'.format(args.server_host, args.server_port) - if args.use_tls: + target = '{}:{}'.format(args['server_host'], args['server_port']) + if args['use_tls']: channel_credentials = grpc.ssl_channel_credentials() channel = grpc.secure_channel(target, channel_credentials) else: @@ -57,7 +59,7 @@ def _async_unary(stub): response_type=messages_pb2.COMPRESSABLE, response_size=size, payload=messages_pb2.Payload(body=b'\x00' * 271828)) - response_future = stub.UnaryCall.future(request) + response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S) response = response_future.result() _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) @@ -68,7 +70,7 @@ def _blocking_unary(stub): response_type=messages_pb2.COMPRESSABLE, response_size=size, payload=messages_pb2.Payload(body=b'\x00' * 271828)) - response = stub.UnaryCall(request) + response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S) _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) @@ -121,6 +123,8 @@ class _ChildProcess(object): def record_exceptions(): try: task(*args) + except grpc.RpcError as rpc_error: + self._exceptions.put('RpcError: %s' % rpc_error) except Exception as e: # pylint: disable=broad-except self._exceptions.put(e) @@ -130,7 +134,9 @@ class _ChildProcess(object): self._process.start() def finish(self): - self._process.join() + self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S) + if self._process.is_alive(): + raise ValueError('Child process did not terminate') if self._process.exitcode != 0: raise ValueError('Child process failed with exitcode %d' % self._process.exitcode) @@ -162,10 +168,10 @@ def _async_unary_same_channel(channel): def _async_unary_new_channel(channel, args): def child_target(): - child_channel = _channel(args) - child_stub = test_pb2_grpc.TestServiceStub(child_channel) - _async_unary(child_stub) - child_channel.close() + with _channel(args) as child_channel: + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _async_unary(child_stub) + child_channel.close() stub = test_pb2_grpc.TestServiceStub(channel) _async_unary(stub) @@ -195,10 +201,9 @@ def _blocking_unary_same_channel(channel): def _blocking_unary_new_channel(channel, args): def child_target(): - child_channel = _channel(args) - child_stub = test_pb2_grpc.TestServiceStub(child_channel) - _blocking_unary(child_stub) - child_channel.close() + with _channel(args) as child_channel: + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _blocking_unary(child_stub) stub = test_pb2_grpc.TestServiceStub(channel) _blocking_unary(stub) @@ -213,54 +218,59 @@ def _close_channel_before_fork(channel, args): def child_target(): new_channel.close() - child_channel = _channel(args) - child_stub = test_pb2_grpc.TestServiceStub(child_channel) - _blocking_unary(child_stub) - child_channel.close() + with _channel(args) as child_channel: + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _blocking_unary(child_stub) stub = test_pb2_grpc.TestServiceStub(channel) _blocking_unary(stub) channel.close() - new_channel = _channel(args) - new_stub = test_pb2_grpc.TestServiceStub(new_channel) - child_process = _ChildProcess(child_target) - child_process.start() - _blocking_unary(new_stub) - child_process.finish() + with _channel(args) as new_channel: + new_stub = test_pb2_grpc.TestServiceStub(new_channel) + child_process = _ChildProcess(child_target) + child_process.start() + _blocking_unary(new_stub) + child_process.finish() def _connectivity_watch(channel, args): def child_target(): + child_channel_ready_event = threading.Event() + def child_connectivity_callback(state): - child_states.append(state) + if state is grpc.ChannelConnectivity.READY: + child_channel_ready_event.set() child_states = [] - child_channel = _channel(args) - child_stub = test_pb2_grpc.TestServiceStub(child_channel) - child_channel.subscribe(child_connectivity_callback) - _async_unary(child_stub) - if len(child_states - ) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY: - raise ValueError('Channel did not move to READY') - if len(parent_states) > 1: - raise ValueError('Received connectivity updates on parent callback') - child_channel.unsubscribe(child_connectivity_callback) - child_channel.close() + with _channel(args) as child_channel: + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + child_channel.subscribe(child_connectivity_callback) + _async_unary(child_stub) + if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S): + raise ValueError('Channel did not move to READY') + if len(parent_states) > 1: + raise ValueError( + 'Received connectivity updates on parent callback', + parent_states) + child_channel.unsubscribe(child_connectivity_callback) + + parent_states = [] + parent_channel_ready_event = threading.Event() def parent_connectivity_callback(state): parent_states.append(state) + if state is grpc.ChannelConnectivity.READY: + parent_channel_ready_event.set() - parent_states = [] channel.subscribe(parent_connectivity_callback) stub = test_pb2_grpc.TestServiceStub(channel) child_process = _ChildProcess(child_target) child_process.start() _async_unary(stub) - if len(parent_states - ) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY: + if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S): raise ValueError('Channel did not move to READY') channel.unsubscribe(parent_connectivity_callback) child_process.finish() @@ -380,9 +390,9 @@ def _in_progress_bidi_same_channel_blocking_call(channel): def _in_progress_bidi_new_channel_async_call(channel, args): def child_target(parent_bidi_call, parent_channel, args): - channel = _channel(args) - stub = test_pb2_grpc.TestServiceStub(channel) - _async_unary(stub) + with _channel(args) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) _ping_pong_with_child_processes_after_first_response( channel, args, child_target) @@ -391,9 +401,9 @@ def _in_progress_bidi_new_channel_async_call(channel, args): def _in_progress_bidi_new_channel_blocking_call(channel, args): def child_target(parent_bidi_call, parent_channel, args): - channel = _channel(args) - stub = test_pb2_grpc.TestServiceStub(channel) - _blocking_unary(stub) + with _channel(args) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) _ping_pong_with_child_processes_after_first_response( channel, args, child_target) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index de4c2c1fdde..00b55b02e89 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -1,6 +1,7 @@ [ "_sanity._sanity_test.SanityTest", "channelz._channelz_servicer_test.ChannelzServicerTest", + "fork._fork_interop_test.ForkInteropTest", "health_check._health_servicer_test.HealthServicerTest", "interop._insecure_intraop_test.InsecureIntraopTest", "interop._secure_intraop_test.SecureIntraopTest", From ff0d2195cfe13545d7a59357590bae49498e0db0 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Fri, 15 Feb 2019 17:00:21 -0800 Subject: [PATCH 2/2] address comments --- .../grpcio_tests/tests/fork/_fork_interop_test.py | 2 +- src/python/grpcio_tests/tests/fork/methods.py | 14 ++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/python/grpcio_tests/tests/fork/_fork_interop_test.py b/src/python/grpcio_tests/tests/fork/_fork_interop_test.py index bbcfb7446a9..608148dfe46 100644 --- a/src/python/grpcio_tests/tests/fork/_fork_interop_test.py +++ b/src/python/grpcio_tests/tests/fork/_fork_interop_test.py @@ -139,7 +139,7 @@ class ForkInteropTest(unittest.TestCase): out, err = process.communicate() except subprocess.TimeoutExpired: process.kill() - raise ValueError('Process failed to terminate') + raise RuntimeError('Process failed to terminate') finally: timer.cancel() self.assertEqual( diff --git a/src/python/grpcio_tests/tests/fork/methods.py b/src/python/grpcio_tests/tests/fork/methods.py index 9481d004cd9..a060ba6e581 100644 --- a/src/python/grpcio_tests/tests/fork/methods.py +++ b/src/python/grpcio_tests/tests/fork/methods.py @@ -136,7 +136,7 @@ class _ChildProcess(object): def finish(self): self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S) if self._process.is_alive(): - raise ValueError('Child process did not terminate') + raise RuntimeError('Child process did not terminate') if self._process.exitcode != 0: raise ValueError('Child process failed with exitcode %d' % self._process.exitcode) @@ -236,6 +236,9 @@ def _close_channel_before_fork(channel, args): def _connectivity_watch(channel, args): + parent_states = [] + parent_channel_ready_event = threading.Event() + def child_target(): child_channel_ready_event = threading.Event() @@ -244,7 +247,6 @@ def _connectivity_watch(channel, args): if state is grpc.ChannelConnectivity.READY: child_channel_ready_event.set() - child_states = [] with _channel(args) as child_channel: child_stub = test_pb2_grpc.TestServiceStub(child_channel) child_channel.subscribe(child_connectivity_callback) @@ -257,9 +259,6 @@ def _connectivity_watch(channel, args): parent_states) child_channel.unsubscribe(child_connectivity_callback) - parent_states = [] - parent_channel_ready_event = threading.Event() - def parent_connectivity_callback(state): parent_states.append(state) if state is grpc.ChannelConnectivity.READY: @@ -275,11 +274,6 @@ def _connectivity_watch(channel, args): channel.unsubscribe(parent_connectivity_callback) child_process.finish() - # Need to unsubscribe or _channel.py in _poll_connectivity triggers a - # "Cannot invoke RPC on closed channel!" error. - # TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling - channel.unsubscribe(parent_connectivity_callback) - def _ping_pong_with_child_processes_after_first_response( channel, args, child_target, run_after_close=True):