Merge pull request #16513 from ericgribkoff/python_unit_fork_tests

Add fork tests as Python unit tests
pull/18084/head
Eric Gribkoff 6 years ago committed by GitHub
commit b7947776f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/python/grpcio/grpc/_channel.py
  2. 44
      src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
  3. 2
      src/python/grpcio_tests/commands.py
  4. 152
      src/python/grpcio_tests/tests/fork/_fork_interop_test.py
  5. 6
      src/python/grpcio_tests/tests/fork/client.py
  6. 106
      src/python/grpcio_tests/tests/fork/methods.py
  7. 1
      src/python/grpcio_tests/tests/tests.json

@ -1033,6 +1033,7 @@ class Channel(grpc.Channel):
def _close(self): def _close(self):
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
cygrpc.fork_unregister_channel(self)
_moot(self._connectivity_state) _moot(self._connectivity_state)
def _close_on_fork(self): def _close_on_fork(self):
@ -1060,8 +1061,6 @@ class Channel(grpc.Channel):
# for as long as they are in use and to close them after using them, # for as long as they are in use and to close them after using them,
# then deletion of this grpc._channel.Channel instance can be made to # then deletion of this grpc._channel.Channel instance can be made to
# effect closure of the underlying cygrpc.Channel instance. # effect closure of the underlying cygrpc.Channel instance.
if cygrpc is not None: # Globals may have already been collected.
cygrpc.fork_unregister_channel(self)
# This prevent the failed-at-initializing object removal from failing. # This prevent the failed-at-initializing object removal from failing.
# Though the __init__ failed, the removal will still trigger __del__. # Though the __init__ failed, the removal will still trigger __del__.
if _moot is not None and hasattr(self, '_connectivity_state'): if _moot is not None and hasattr(self, '_connectivity_state'):

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
import os import os
import threading import threading
@ -37,8 +36,12 @@ _GRPC_ENABLE_FORK_SUPPORT = (
os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0') os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
.lower() in _TRUE_VALUES) .lower() in _TRUE_VALUES)
_fork_handler_failed = False
cdef void __prefork() nogil: cdef void __prefork() nogil:
with gil: with gil:
global _fork_handler_failed
_fork_handler_failed = False
with _fork_state.fork_in_progress_condition: with _fork_state.fork_in_progress_condition:
_fork_state.fork_in_progress = True _fork_state.fork_in_progress = True
if not _fork_state.active_thread_count.await_zero_threads( if not _fork_state.active_thread_count.await_zero_threads(
@ -46,6 +49,7 @@ cdef void __prefork() nogil:
_LOGGER.error( _LOGGER.error(
'Failed to shutdown gRPC Python threads prior to fork. ' 'Failed to shutdown gRPC Python threads prior to fork. '
'Behavior after fork will be undefined.') 'Behavior after fork will be undefined.')
_fork_handler_failed = True
cdef void __postfork_parent() nogil: cdef void __postfork_parent() nogil:
@ -57,20 +61,28 @@ cdef void __postfork_parent() nogil:
cdef void __postfork_child() nogil: cdef void __postfork_child() nogil:
with gil: with gil:
# Thread could be holding the fork_in_progress_condition inside of try:
# block_if_fork_in_progress() when fork occurs. Reset the lock here. if _fork_handler_failed:
_fork_state.fork_in_progress_condition = threading.Condition() return
# A thread in return_from_user_request_generator() may hold this lock # Thread could be holding the fork_in_progress_condition inside of
# when fork occurs. # block_if_fork_in_progress() when fork occurs. Reset the lock here.
_fork_state.active_thread_count = _ActiveThreadCount() _fork_state.fork_in_progress_condition = threading.Condition()
for state_to_reset in _fork_state.postfork_states_to_reset: # A thread in return_from_user_request_generator() may hold this lock
state_to_reset.reset_postfork_child() # when fork occurs.
_fork_state.fork_epoch += 1 _fork_state.active_thread_count = _ActiveThreadCount()
for channel in _fork_state.channels: for state_to_reset in _fork_state.postfork_states_to_reset:
channel._close_on_fork() state_to_reset.reset_postfork_child()
# TODO(ericgribkoff) Check and abort if core is not shutdown _fork_state.postfork_states_to_reset = []
with _fork_state.fork_in_progress_condition: _fork_state.fork_epoch += 1
_fork_state.fork_in_progress = False for channel in _fork_state.channels:
channel._close_on_fork()
with _fork_state.fork_in_progress_condition:
_fork_state.fork_in_progress = False
except:
_LOGGER.error('Exiting child due to raised exception')
_LOGGER.error(sys.exc_info()[0])
os._exit(os.EX_USAGE)
if grpc_is_initialized() > 0: if grpc_is_initialized() > 0:
with gil: with gil:
_LOGGER.error('Failed to shutdown gRPC Core after fork()') _LOGGER.error('Failed to shutdown gRPC Core after fork()')
@ -148,7 +160,7 @@ def fork_register_channel(channel):
def fork_unregister_channel(channel): def fork_unregister_channel(channel):
if _GRPC_ENABLE_FORK_SUPPORT: if _GRPC_ENABLE_FORK_SUPPORT:
_fork_state.channels.remove(channel) _fork_state.channels.discard(channel)
class _ActiveThreadCount(object): class _ActiveThreadCount(object):

@ -111,6 +111,8 @@ class TestGevent(setuptools.Command):
"""Command to run tests w/gevent.""" """Command to run tests w/gevent."""
BANNED_TESTS = ( BANNED_TESTS = (
# Fork support is not compatible with gevent
'fork._fork_interop_test.ForkInteropTest',
# These tests send a lot of RPCs and are really slow on gevent. They will # These tests send a lot of RPCs and are really slow on gevent. They will
# eventually succeed, but need to dig into performance issues. # eventually succeed, but need to dig into performance issues.
'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs', 'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs',

@ -0,0 +1,152 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client-side fork interop tests as a unit test."""
import six
import subprocess
import sys
import threading
import unittest
from grpc._cython import cygrpc
from tests.fork import methods
# New instance of multiprocessing.Process using fork without exec can and will
# hang if the Python process has any other threads running. This includes the
# additional thread spawned by our _runner.py class. So in order to test our
# compatibility with multiprocessing, we first fork+exec a new process to ensure
# we don't have any conflicting background threads.
_CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
import os
import sys
from grpc._cython import cygrpc
from tests.fork import methods
cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
methods.TestCase.%s.run_test({
'server_host': 'localhost',
'server_port': %d,
'use_tls': False
})
"""
_SUBPROCESS_TIMEOUT_S = 30
@unittest.skipUnless(
sys.platform.startswith("linux"),
"not supported on windows, and fork+exec networking blocked on mac")
@unittest.skipUnless(six.PY2, "https://github.com/grpc/grpc/issues/18075")
class ForkInteropTest(unittest.TestCase):
def setUp(self):
start_server_script = """if True:
import sys
import time
import grpc
from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods as interop_methods
from tests.unit import test_common
server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(
interop_methods.TestService(), server)
port = server.add_insecure_port('[::]:0')
server.start()
print(port)
sys.stdout.flush()
while True:
time.sleep(1)
"""
self._server_process = subprocess.Popen(
[sys.executable, '-c', start_server_script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
timer = threading.Timer(_SUBPROCESS_TIMEOUT_S,
self._server_process.kill)
try:
timer.start()
self._port = int(self._server_process.stdout.readline())
except ValueError:
raise Exception('Failed to get port from server')
finally:
timer.cancel()
def testConnectivityWatch(self):
self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH)
def testCloseChannelBeforeFork(self):
self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK)
def testAsyncUnarySameChannel(self):
self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL)
def testAsyncUnaryNewChannel(self):
self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL)
def testBlockingUnarySameChannel(self):
self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL)
def testBlockingUnaryNewChannel(self):
self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL)
def testInProgressBidiContinueCall(self):
self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL)
def testInProgressBidiSameChannelAsyncCall(self):
self._verifyTestCase(
methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL)
def testInProgressBidiSameChannelBlockingCall(self):
self._verifyTestCase(
methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL)
def testInProgressBidiNewChannelAsyncCall(self):
self._verifyTestCase(
methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL)
def testInProgressBidiNewChannelBlockingCall(self):
self._verifyTestCase(
methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL)
def tearDown(self):
self._server_process.kill()
def _verifyTestCase(self, test_case):
script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
process = subprocess.Popen(
[sys.executable, '-c', script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill)
try:
timer.start()
try:
out, err = process.communicate(timeout=_SUBPROCESS_TIMEOUT_S)
except TypeError:
# The timeout parameter was added in Python 3.3.
out, err = process.communicate()
except subprocess.TimeoutExpired:
process.kill()
raise RuntimeError('Process failed to terminate')
finally:
timer.cancel()
self.assertEqual(
0, process.returncode,
'process failed with exit code %d (stdout: %s, stderr: %s)' %
(process.returncode, out, err))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -63,12 +63,12 @@ def _test_case_from_arg(test_case_arg):
def test_fork(): def test_fork():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
args = _args() args = vars(_args())
if args.test_case == "all": if args['test_case'] == "all":
for test_case in methods.TestCase: for test_case in methods.TestCase:
test_case.run_test(args) test_case.run_test(args)
else: else:
test_case = _test_case_from_arg(args.test_case) test_case = _test_case_from_arg(args['test_case'])
test_case.run_test(args) test_case.run_test(args)

@ -30,11 +30,13 @@ from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc from src.proto.grpc.testing import test_pb2_grpc
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_RPC_TIMEOUT_S = 10
_CHILD_FINISH_TIMEOUT_S = 60
def _channel(args): def _channel(args):
target = '{}:{}'.format(args.server_host, args.server_port) target = '{}:{}'.format(args['server_host'], args['server_port'])
if args.use_tls: if args['use_tls']:
channel_credentials = grpc.ssl_channel_credentials() channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(target, channel_credentials) channel = grpc.secure_channel(target, channel_credentials)
else: else:
@ -57,7 +59,7 @@ def _async_unary(stub):
response_type=messages_pb2.COMPRESSABLE, response_type=messages_pb2.COMPRESSABLE,
response_size=size, response_size=size,
payload=messages_pb2.Payload(body=b'\x00' * 271828)) payload=messages_pb2.Payload(body=b'\x00' * 271828))
response_future = stub.UnaryCall.future(request) response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
response = response_future.result() response = response_future.result()
_validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
@ -68,7 +70,7 @@ def _blocking_unary(stub):
response_type=messages_pb2.COMPRESSABLE, response_type=messages_pb2.COMPRESSABLE,
response_size=size, response_size=size,
payload=messages_pb2.Payload(body=b'\x00' * 271828)) payload=messages_pb2.Payload(body=b'\x00' * 271828))
response = stub.UnaryCall(request) response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
_validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
@ -121,6 +123,8 @@ class _ChildProcess(object):
def record_exceptions(): def record_exceptions():
try: try:
task(*args) task(*args)
except grpc.RpcError as rpc_error:
self._exceptions.put('RpcError: %s' % rpc_error)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
self._exceptions.put(e) self._exceptions.put(e)
@ -130,7 +134,9 @@ class _ChildProcess(object):
self._process.start() self._process.start()
def finish(self): def finish(self):
self._process.join() self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
if self._process.is_alive():
raise RuntimeError('Child process did not terminate')
if self._process.exitcode != 0: if self._process.exitcode != 0:
raise ValueError('Child process failed with exitcode %d' % raise ValueError('Child process failed with exitcode %d' %
self._process.exitcode) self._process.exitcode)
@ -162,10 +168,10 @@ def _async_unary_same_channel(channel):
def _async_unary_new_channel(channel, args): def _async_unary_new_channel(channel, args):
def child_target(): def child_target():
child_channel = _channel(args) with _channel(args) as child_channel:
child_stub = test_pb2_grpc.TestServiceStub(child_channel) child_stub = test_pb2_grpc.TestServiceStub(child_channel)
_async_unary(child_stub) _async_unary(child_stub)
child_channel.close() child_channel.close()
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
_async_unary(stub) _async_unary(stub)
@ -195,10 +201,9 @@ def _blocking_unary_same_channel(channel):
def _blocking_unary_new_channel(channel, args): def _blocking_unary_new_channel(channel, args):
def child_target(): def child_target():
child_channel = _channel(args) with _channel(args) as child_channel:
child_stub = test_pb2_grpc.TestServiceStub(child_channel) child_stub = test_pb2_grpc.TestServiceStub(child_channel)
_blocking_unary(child_stub) _blocking_unary(child_stub)
child_channel.close()
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
_blocking_unary(stub) _blocking_unary(stub)
@ -213,63 +218,62 @@ def _close_channel_before_fork(channel, args):
def child_target(): def child_target():
new_channel.close() new_channel.close()
child_channel = _channel(args) with _channel(args) as child_channel:
child_stub = test_pb2_grpc.TestServiceStub(child_channel) child_stub = test_pb2_grpc.TestServiceStub(child_channel)
_blocking_unary(child_stub) _blocking_unary(child_stub)
child_channel.close()
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
_blocking_unary(stub) _blocking_unary(stub)
channel.close() channel.close()
new_channel = _channel(args) with _channel(args) as new_channel:
new_stub = test_pb2_grpc.TestServiceStub(new_channel) new_stub = test_pb2_grpc.TestServiceStub(new_channel)
child_process = _ChildProcess(child_target) child_process = _ChildProcess(child_target)
child_process.start() child_process.start()
_blocking_unary(new_stub) _blocking_unary(new_stub)
child_process.finish() child_process.finish()
def _connectivity_watch(channel, args): def _connectivity_watch(channel, args):
parent_states = []
parent_channel_ready_event = threading.Event()
def child_target(): def child_target():
child_channel_ready_event = threading.Event()
def child_connectivity_callback(state): def child_connectivity_callback(state):
child_states.append(state) if state is grpc.ChannelConnectivity.READY:
child_channel_ready_event.set()
child_states = []
child_channel = _channel(args) with _channel(args) as child_channel:
child_stub = test_pb2_grpc.TestServiceStub(child_channel) child_stub = test_pb2_grpc.TestServiceStub(child_channel)
child_channel.subscribe(child_connectivity_callback) child_channel.subscribe(child_connectivity_callback)
_async_unary(child_stub) _async_unary(child_stub)
if len(child_states if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY: raise ValueError('Channel did not move to READY')
raise ValueError('Channel did not move to READY') if len(parent_states) > 1:
if len(parent_states) > 1: raise ValueError(
raise ValueError('Received connectivity updates on parent callback') 'Received connectivity updates on parent callback',
child_channel.unsubscribe(child_connectivity_callback) parent_states)
child_channel.close() child_channel.unsubscribe(child_connectivity_callback)
def parent_connectivity_callback(state): def parent_connectivity_callback(state):
parent_states.append(state) parent_states.append(state)
if state is grpc.ChannelConnectivity.READY:
parent_channel_ready_event.set()
parent_states = []
channel.subscribe(parent_connectivity_callback) channel.subscribe(parent_connectivity_callback)
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
child_process = _ChildProcess(child_target) child_process = _ChildProcess(child_target)
child_process.start() child_process.start()
_async_unary(stub) _async_unary(stub)
if len(parent_states if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY:
raise ValueError('Channel did not move to READY') raise ValueError('Channel did not move to READY')
channel.unsubscribe(parent_connectivity_callback) channel.unsubscribe(parent_connectivity_callback)
child_process.finish() child_process.finish()
# Need to unsubscribe or _channel.py in _poll_connectivity triggers a
# "Cannot invoke RPC on closed channel!" error.
# TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling
channel.unsubscribe(parent_connectivity_callback)
def _ping_pong_with_child_processes_after_first_response( def _ping_pong_with_child_processes_after_first_response(
channel, args, child_target, run_after_close=True): channel, args, child_target, run_after_close=True):
@ -380,9 +384,9 @@ def _in_progress_bidi_same_channel_blocking_call(channel):
def _in_progress_bidi_new_channel_async_call(channel, args): def _in_progress_bidi_new_channel_async_call(channel, args):
def child_target(parent_bidi_call, parent_channel, args): def child_target(parent_bidi_call, parent_channel, args):
channel = _channel(args) with _channel(args) as channel:
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
_async_unary(stub) _async_unary(stub)
_ping_pong_with_child_processes_after_first_response( _ping_pong_with_child_processes_after_first_response(
channel, args, child_target) channel, args, child_target)
@ -391,9 +395,9 @@ def _in_progress_bidi_new_channel_async_call(channel, args):
def _in_progress_bidi_new_channel_blocking_call(channel, args): def _in_progress_bidi_new_channel_blocking_call(channel, args):
def child_target(parent_bidi_call, parent_channel, args): def child_target(parent_bidi_call, parent_channel, args):
channel = _channel(args) with _channel(args) as channel:
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
_blocking_unary(stub) _blocking_unary(stub)
_ping_pong_with_child_processes_after_first_response( _ping_pong_with_child_processes_after_first_response(
channel, args, child_target) channel, args, child_target)

@ -1,6 +1,7 @@
[ [
"_sanity._sanity_test.SanityTest", "_sanity._sanity_test.SanityTest",
"channelz._channelz_servicer_test.ChannelzServicerTest", "channelz._channelz_servicer_test.ChannelzServicerTest",
"fork._fork_interop_test.ForkInteropTest",
"health_check._health_servicer_test.HealthServicerTest", "health_check._health_servicer_test.HealthServicerTest",
"interop._insecure_intraop_test.InsecureIntraopTest", "interop._insecure_intraop_test.InsecureIntraopTest",
"interop._secure_intraop_test.SecureIntraopTest", "interop._secure_intraop_test.SecureIntraopTest",

Loading…
Cancel
Save