diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 141116df5dd..3c33b46dbb8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): cdef _interpret_event(grpc_event c_event): cdef _Tag tag if c_event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + # TODO(ericgribkoff) Do not coopt ConnectivityEvent here. return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif c_event.type == GRPC_QUEUE_SHUTDOWN: # NOTE(nathaniel): For now we coopt ConnectivityEvent here. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ce701724fd3..e89e02b171e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -128,7 +128,10 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - def __dealloc__(self): + # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, + # portion of this is safe to call from __dealloc__, and potentially remove + # backup_shutdown_queue. + def destroy(self): if self.c_server != NULL: if not self.is_started: pass @@ -146,4 +149,8 @@ cdef class Server: while not self.is_shutdown: time.sleep(0) grpc_server_destroy(self.c_server) - grpc_shutdown() + self.c_server = NULL + + def __dealloc(self): + if self.c_server == NULL: + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 3bbfa47da53..eb750ef1a82 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -48,7 +48,7 @@ _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 -_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 +_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0 def _serialized_request(request_event): @@ -676,6 +676,9 @@ class _ServerState(object): self.rpc_states = set() self.due = set() + # A "volatile" flag to interrupt the daemon serving thread + self.server_deallocated = False + def _add_generic_handlers(state, generic_handlers): with state.lock: @@ -702,6 +705,7 @@ def _request_call(state): # TODO(https://github.com/grpc/grpc/issues/6597): delete this function. def _stop_serving(state): if not state.rpc_states and not state.due: + state.server.destroy() for shutdown_event in state.shutdown_events: shutdown_event.set() state.stage = _ServerStage.STOPPED @@ -715,49 +719,69 @@ def _on_call_completed(state): state.active_rpc_count -= 1 -def _serve(state): - while True: - event = state.completion_queue.poll() - if event.tag is _SHUTDOWN_TAG: +def _process_event_and_continue(state, event): + should_continue = True + if event.tag is _SHUTDOWN_TAG: + with state.lock: + state.due.remove(_SHUTDOWN_TAG) + if _stop_serving(state): + should_continue = False + elif event.tag is _REQUEST_CALL_TAG: + with state.lock: + state.due.remove(_REQUEST_CALL_TAG) + concurrency_exceeded = ( + state.maximum_concurrent_rpcs is not None and + state.active_rpc_count >= state.maximum_concurrent_rpcs) + rpc_state, rpc_future = _handle_call( + event, state.generic_handlers, state.interceptor_pipeline, + state.thread_pool, concurrency_exceeded) + if rpc_state is not None: + state.rpc_states.add(rpc_state) + if rpc_future is not None: + state.active_rpc_count += 1 + rpc_future.add_done_callback( + lambda unused_future: _on_call_completed(state)) + if state.stage is _ServerStage.STARTED: + _request_call(state) + elif _stop_serving(state): + should_continue = False + else: + rpc_state, callbacks = event.tag(event) + for callback in callbacks: + callable_util.call_logging_exceptions(callback, + 'Exception calling callback!') + if rpc_state is not None: with state.lock: - state.due.remove(_SHUTDOWN_TAG) + state.rpc_states.remove(rpc_state) if _stop_serving(state): - return - elif event.tag is _REQUEST_CALL_TAG: - with state.lock: - state.due.remove(_REQUEST_CALL_TAG) - concurrency_exceeded = ( - state.maximum_concurrent_rpcs is not None and - state.active_rpc_count >= state.maximum_concurrent_rpcs) - rpc_state, rpc_future = _handle_call( - event, state.generic_handlers, state.interceptor_pipeline, - state.thread_pool, concurrency_exceeded) - if rpc_state is not None: - state.rpc_states.add(rpc_state) - if rpc_future is not None: - state.active_rpc_count += 1 - rpc_future.add_done_callback( - lambda unused_future: _on_call_completed(state)) - if state.stage is _ServerStage.STARTED: - _request_call(state) - elif _stop_serving(state): - return - else: - rpc_state, callbacks = event.tag(event) - for callback in callbacks: - callable_util.call_logging_exceptions( - callback, 'Exception calling callback!') - if rpc_state is not None: - with state.lock: - state.rpc_states.remove(rpc_state) - if _stop_serving(state): - return + should_continue = False + return should_continue + + +def _serve(state): + while True: + timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S + event = state.completion_queue.poll(timeout) + if state.server_deallocated: + _begin_shutdown_once(state) + if event.completion_type != cygrpc.CompletionType.queue_timeout: + if not _process_event_and_continue(state, event): + return # We want to force the deletion of the previous event # ~before~ we poll again; if the event has a reference # to a shutdown Call object, this can induce spinlock. event = None +def _begin_shutdown_once(state): + with state.lock: + if state.stage is _ServerStage.STARTED: + state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) + state.stage = _ServerStage.GRACE + state.shutdown_events = [] + state.due.add(_SHUTDOWN_TAG) + + def _stop(state, grace): with state.lock: if state.stage is _ServerStage.STOPPED: @@ -765,11 +789,7 @@ def _stop(state, grace): shutdown_event.set() return shutdown_event else: - if state.stage is _ServerStage.STARTED: - state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) - state.stage = _ServerStage.GRACE - state.shutdown_events = [] - state.due.add(_SHUTDOWN_TAG) + _begin_shutdown_once(state) shutdown_event = threading.Event() state.shutdown_events.append(shutdown_event) if grace is None: @@ -840,7 +860,9 @@ class _Server(grpc.Server): return _stop(self._state, grace) def __del__(self): - _stop(self._state, None) + # We can not grab a lock in __del__(), so set a flag to signal the + # serving daemon thread (if it exists) to initiate shutdown. + self._state.server_deallocated = True def create_server(thread_pool, generic_rpc_handlers, interceptors, options, diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py index d060a4973e4..c63ff5cd842 100644 --- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -88,8 +88,6 @@ def _generate_channel_server_pairs(n): def _close_channel_server_pairs(pairs): for pair in pairs: pair.server.stop(None) - # TODO(ericgribkoff) This del should not be required - del pair.server pair.channel.close() diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index b27e6f26938..f202a3f932f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -57,6 +57,7 @@ "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", + "unit._server_shutdown_test.ServerShutdown", "unit._server_ssl_cert_config_test.ServerSSLCertConfigFetcherParamsChecks", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 4f850220f8f..1b462ec67a6 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -28,6 +28,7 @@ GRPCIO_TESTS_UNIT = [ # TODO(ghostwriternr): To be added later. # "_server_ssl_cert_config_test.py", "_server_test.py", + "_server_shutdown_test.py", "_session_cache_test.py", ] @@ -49,6 +50,11 @@ py_library( srcs = ["_exit_scenarios.py"], ) +py_library( + name = "_server_shutdown_scenarios", + srcs = ["_server_shutdown_scenarios.py"], +) + py_library( name = "_thread_pool", srcs = ["_thread_pool.py"], @@ -70,6 +76,7 @@ py_library( ":resources", ":test_common", ":_exit_scenarios", + ":_server_shutdown_scenarios", ":_thread_pool", ":_from_grpc_import_star", "//src/python/grpcio_tests/tests/unit/framework/common", diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py new file mode 100644 index 00000000000..1d1fdba11ee --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -0,0 +1,97 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" + +import argparse +import os +import threading +import time +import logging + +import grpc +from tests.unit import test_common + +from concurrent import futures +from six.moves import queue + +WAIT_TIME = 1000 + +REQUEST = b'request' +RESPONSE = b'response' + +SERVER_RAISES_EXCEPTION = 'server_raises_exception' +SERVER_DEALLOCATED = 'server_deallocated' +SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' + +FORK_EXIT = '/test/ForkExit' + + +def fork_and_exit(request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == FORK_EXIT: + return grpc.unary_unary_rpc_method_handler(fork_and_exit) + else: + return None + + +def run_server(port_queue): + server = test_common.test_server() + port = server.add_insecure_port('[::]:0') + port_queue.put(port) + server.add_generic_rpc_handlers((GenericHandler(),)) + server.start() + # threading.Event.wait() does not exhibit the bug identified in + # https://github.com/grpc/grpc/issues/17093, sleep instead + time.sleep(WAIT_TIME) + + +def run_test(args): + if args.scenario == SERVER_RAISES_EXCEPTION: + server = test_common.test_server() + server.start() + raise Exception() + elif args.scenario == SERVER_DEALLOCATED: + server = test_common.test_server() + server.start() + server.__del__() + while server._state.stage != grpc._server._ServerStage.STOPPED: + pass + elif args.scenario == SERVER_FORK_CAN_EXIT: + port_queue = queue.Queue() + thread = threading.Thread(target=run_server, args=(port_queue,)) + thread.daemon = True + thread.start() + port = port_queue.get() + channel = grpc.insecure_channel('localhost:%d' % port) + multi_callable = channel.unary_unary(FORK_EXIT) + result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) + os.wait() + else: + raise ValueError('unknown test scenario') + + +if __name__ == '__main__': + logging.basicConfig() + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + args = parser.parse_args() + run_test(args) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py new file mode 100644 index 00000000000..47446d65a51 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -0,0 +1,90 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests clean shutdown of server on various interpreter exit conditions. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import subprocess +import sys +import threading +import unittest +import logging + +from tests.unit import _server_shutdown_scenarios + +SCENARIO_FILE = os.path.abspath( + os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '_server_shutdown_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: # pylint: disable=broad-except + pass + + +atexit.register(cleanup_processes) + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ServerShutdown(unittest.TestCase): + + # Currently we shut down a server (if possible) after the Python server + # instance is garbage collected. This behavior may change in the future. + def test_deallocated_server_stops(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_exception_exits(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + @unittest.skipIf(os.name == 'nt', 'fork not supported on windows') + def test_server_fork_can_exit(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2)