From 082b63e09592502b0900a5dff214dc70efb9de56 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Fri, 7 Dec 2018 09:23:54 -0800 Subject: [PATCH 1/7] Refactor server deallocation --- .../_cython/_cygrpc/completion_queue.pyx.pxi | 2 +- .../grpc/_cython/_cygrpc/server.pyx.pxi | 10 +- src/python/grpcio/grpc/_server.py | 106 +++++++++++------- .../tests/channelz/_channelz_servicer_test.py | 2 - 4 files changed, 73 insertions(+), 47 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 141116df5dd..3c33b46dbb8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): cdef _interpret_event(grpc_event c_event): cdef _Tag tag if c_event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + # TODO(ericgribkoff) Do not coopt ConnectivityEvent here. return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif c_event.type == GRPC_QUEUE_SHUTDOWN: # NOTE(nathaniel): For now we coopt ConnectivityEvent here. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ce701724fd3..1b8d6790fb3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -128,7 +128,9 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - def __dealloc__(self): + # TODO(ericgribkoff) Determine what, if any, portion of this is safe to call + # from __dealloc__, and potentially remove backup_shutdown_queue. + def destroy(self): if self.c_server != NULL: if not self.is_started: pass @@ -146,4 +148,8 @@ cdef class Server: while not self.is_shutdown: time.sleep(0) grpc_server_destroy(self.c_server) - grpc_shutdown() + self.c_server = NULL + + def __dealloc(self): + if self.c_server == NULL: + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 3bbfa47da53..eb750ef1a82 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -48,7 +48,7 @@ _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 -_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 +_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0 def _serialized_request(request_event): @@ -676,6 +676,9 @@ class _ServerState(object): self.rpc_states = set() self.due = set() + # A "volatile" flag to interrupt the daemon serving thread + self.server_deallocated = False + def _add_generic_handlers(state, generic_handlers): with state.lock: @@ -702,6 +705,7 @@ def _request_call(state): # TODO(https://github.com/grpc/grpc/issues/6597): delete this function. def _stop_serving(state): if not state.rpc_states and not state.due: + state.server.destroy() for shutdown_event in state.shutdown_events: shutdown_event.set() state.stage = _ServerStage.STOPPED @@ -715,49 +719,69 @@ def _on_call_completed(state): state.active_rpc_count -= 1 -def _serve(state): - while True: - event = state.completion_queue.poll() - if event.tag is _SHUTDOWN_TAG: +def _process_event_and_continue(state, event): + should_continue = True + if event.tag is _SHUTDOWN_TAG: + with state.lock: + state.due.remove(_SHUTDOWN_TAG) + if _stop_serving(state): + should_continue = False + elif event.tag is _REQUEST_CALL_TAG: + with state.lock: + state.due.remove(_REQUEST_CALL_TAG) + concurrency_exceeded = ( + state.maximum_concurrent_rpcs is not None and + state.active_rpc_count >= state.maximum_concurrent_rpcs) + rpc_state, rpc_future = _handle_call( + event, state.generic_handlers, state.interceptor_pipeline, + state.thread_pool, concurrency_exceeded) + if rpc_state is not None: + state.rpc_states.add(rpc_state) + if rpc_future is not None: + state.active_rpc_count += 1 + rpc_future.add_done_callback( + lambda unused_future: _on_call_completed(state)) + if state.stage is _ServerStage.STARTED: + _request_call(state) + elif _stop_serving(state): + should_continue = False + else: + rpc_state, callbacks = event.tag(event) + for callback in callbacks: + callable_util.call_logging_exceptions(callback, + 'Exception calling callback!') + if rpc_state is not None: with state.lock: - state.due.remove(_SHUTDOWN_TAG) + state.rpc_states.remove(rpc_state) if _stop_serving(state): - return - elif event.tag is _REQUEST_CALL_TAG: - with state.lock: - state.due.remove(_REQUEST_CALL_TAG) - concurrency_exceeded = ( - state.maximum_concurrent_rpcs is not None and - state.active_rpc_count >= state.maximum_concurrent_rpcs) - rpc_state, rpc_future = _handle_call( - event, state.generic_handlers, state.interceptor_pipeline, - state.thread_pool, concurrency_exceeded) - if rpc_state is not None: - state.rpc_states.add(rpc_state) - if rpc_future is not None: - state.active_rpc_count += 1 - rpc_future.add_done_callback( - lambda unused_future: _on_call_completed(state)) - if state.stage is _ServerStage.STARTED: - _request_call(state) - elif _stop_serving(state): - return - else: - rpc_state, callbacks = event.tag(event) - for callback in callbacks: - callable_util.call_logging_exceptions( - callback, 'Exception calling callback!') - if rpc_state is not None: - with state.lock: - state.rpc_states.remove(rpc_state) - if _stop_serving(state): - return + should_continue = False + return should_continue + + +def _serve(state): + while True: + timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S + event = state.completion_queue.poll(timeout) + if state.server_deallocated: + _begin_shutdown_once(state) + if event.completion_type != cygrpc.CompletionType.queue_timeout: + if not _process_event_and_continue(state, event): + return # We want to force the deletion of the previous event # ~before~ we poll again; if the event has a reference # to a shutdown Call object, this can induce spinlock. event = None +def _begin_shutdown_once(state): + with state.lock: + if state.stage is _ServerStage.STARTED: + state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) + state.stage = _ServerStage.GRACE + state.shutdown_events = [] + state.due.add(_SHUTDOWN_TAG) + + def _stop(state, grace): with state.lock: if state.stage is _ServerStage.STOPPED: @@ -765,11 +789,7 @@ def _stop(state, grace): shutdown_event.set() return shutdown_event else: - if state.stage is _ServerStage.STARTED: - state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) - state.stage = _ServerStage.GRACE - state.shutdown_events = [] - state.due.add(_SHUTDOWN_TAG) + _begin_shutdown_once(state) shutdown_event = threading.Event() state.shutdown_events.append(shutdown_event) if grace is None: @@ -840,7 +860,9 @@ class _Server(grpc.Server): return _stop(self._state, grace) def __del__(self): - _stop(self._state, None) + # We can not grab a lock in __del__(), so set a flag to signal the + # serving daemon thread (if it exists) to initiate shutdown. + self._state.server_deallocated = True def create_server(thread_pool, generic_rpc_handlers, interceptors, options, diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py index 8ca51895224..5265911a0be 100644 --- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -88,8 +88,6 @@ def _generate_channel_server_pairs(n): def _close_channel_server_pairs(pairs): for pair in pairs: pair.server.stop(None) - # TODO(ericgribkoff) This del should not be required - del pair.server pair.channel.close() From a76d72e0a62d512f919f0fa79e24809427c9e4e7 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Fri, 14 Dec 2018 17:41:02 -0800 Subject: [PATCH 2/7] add tests --- src/python/grpcio_tests/tests/tests.json | 1 + .../tests/unit/_server_shutdown_scenarios.py | 113 ++++++++++++++++++ .../tests/unit/_server_shutdown_test.py | 87 ++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py create mode 100644 src/python/grpcio_tests/tests/unit/_server_shutdown_test.py diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index b27e6f26938..f202a3f932f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -57,6 +57,7 @@ "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", + "unit._server_shutdown_test.ServerShutdown", "unit._server_ssl_cert_config_test.ServerSSLCertConfigFetcherParamsChecks", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py new file mode 100644 index 00000000000..da8cef0c430 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -0,0 +1,113 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" + +import argparse +import os +import threading +import time +import logging + +import grpc + +from concurrent import futures +from six.moves import queue + +WAIT_TIME = 1000 + +REQUEST = b'request' +RESPONSE = b'response' + +SERVER_RAISES_EXCEPTION = 'server_raises_exception' +SERVER_DEALLOCATED = 'server_deallocated' +SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' + +FORK_EXIT = '/test/ForkExit' + + +class ForkExitHandler(object): + + def unary_unary(self, request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE + + def __init__(self): + self.request_streaming = None + self.response_streaming = None + self.request_deserializer = None + self.response_serializer = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == FORK_EXIT: + return ForkExitHandler() + else: + return None + + +def run_server(port_queue,): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + options=(('grpc.so_reuseport', 0),)) + port = server.add_insecure_port('[::]:0') + port_queue.put(port) + server.add_generic_rpc_handlers((GenericHandler(),)) + server.start() + # threading.Event.wait() does not exhibit the bug identified in + # https://github.com/grpc/grpc/issues/17093, sleep instead + time.sleep(WAIT_TIME) + + +def run_test(args): + if args.scenario == SERVER_RAISES_EXCEPTION: + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(('grpc.so_reuseport', 0),)) + server.start() + raise Exception() + elif args.scenario == SERVER_DEALLOCATED: + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(('grpc.so_reuseport', 0),)) + server.start() + server.__del__() + while server._state.stage != grpc._server._ServerStage.STOPPED: + pass + elif args.scenario == SERVER_FORK_CAN_EXIT: + port_queue = queue.Queue() + thread = threading.Thread(target=run_server, args=(port_queue,)) + thread.daemon = True + thread.start() + port = port_queue.get() + channel = grpc.insecure_channel('[::]:%d' % port) + multi_callable = channel.unary_unary(FORK_EXIT) + result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) + os.wait() + else: + raise ValueError('unknown test scenario') + + +if __name__ == '__main__': + logging.basicConfig() + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + args = parser.parse_args() + run_test(args) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py new file mode 100644 index 00000000000..25d7f5e8198 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -0,0 +1,87 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests clean shutdown of server on various interpreter exit conditions. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import subprocess +import sys +import threading +import unittest +import logging + +from tests.unit import _server_shutdown_scenarios + +SCENARIO_FILE = os.path.abspath( + os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '_server_shutdown_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: # pylint: disable=broad-except + pass + + +atexit.register(cleanup_processes) + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ServerShutdown(unittest.TestCase): + + def test_deallocated_server_stops(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_exception_exits(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_fork_can_exit(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) From 468ae0f48615a58d8f09b76ca142fee27563964a Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Sat, 15 Dec 2018 08:36:32 -0800 Subject: [PATCH 3/7] add tracking issue --- src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 1b8d6790fb3..e89e02b171e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -128,8 +128,9 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - # TODO(ericgribkoff) Determine what, if any, portion of this is safe to call - # from __dealloc__, and potentially remove backup_shutdown_queue. + # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, + # portion of this is safe to call from __dealloc__, and potentially remove + # backup_shutdown_queue. def destroy(self): if self.c_server != NULL: if not self.is_started: From 718084c6b4aac0099d52a0dcf4c529e5b46ee686 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Sun, 16 Dec 2018 19:03:18 -0800 Subject: [PATCH 4/7] disable fork test on windows --- .../grpcio_tests/tests/unit/_server_shutdown_scenarios.py | 2 +- src/python/grpcio_tests/tests/unit/_server_shutdown_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py index da8cef0c430..acbec0f77dd 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -63,7 +63,7 @@ class GenericHandler(grpc.GenericRpcHandler): return None -def run_server(port_queue,): +def run_server(port_queue): server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), options=(('grpc.so_reuseport', 0),)) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py index 25d7f5e8198..0e4591c5e56 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -74,6 +74,7 @@ class ServerShutdown(unittest.TestCase): stderr=sys.stderr) wait(process) + @unittest.skipIf(os.name == 'nt', 'fork not supported on windows') def test_server_fork_can_exit(self): process = subprocess.Popen( BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], From 05d3ab2852cb2d7ce13f7ca059b36884e37175fd Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 17 Dec 2018 14:54:10 -0800 Subject: [PATCH 5/7] Address comments, improve tests --- .../grpcio_tests/tests/unit/BUILD.bazel | 1 + .../tests/unit/_server_shutdown_scenarios.py | 36 ++++++------------- .../tests/unit/_server_shutdown_test.py | 2 ++ 3 files changed, 13 insertions(+), 26 deletions(-) diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 4f850220f8f..7a910fd9feb 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -28,6 +28,7 @@ GRPCIO_TESTS_UNIT = [ # TODO(ghostwriternr): To be added later. # "_server_ssl_cert_config_test.py", "_server_test.py", + "_server_shutdown_test.py", "_session_cache_test.py", ] diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py index acbec0f77dd..1797e9bbfec 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -20,6 +20,7 @@ import time import logging import grpc +from tests.unit import test_common from concurrent import futures from six.moves import queue @@ -36,37 +37,24 @@ SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' FORK_EXIT = '/test/ForkExit' -class ForkExitHandler(object): - - def unary_unary(self, request, servicer_context): - pid = os.fork() - if pid == 0: - os._exit(0) - return RESPONSE - - def __init__(self): - self.request_streaming = None - self.response_streaming = None - self.request_deserializer = None - self.response_serializer = None - self.unary_stream = None - self.stream_unary = None - self.stream_stream = None +def fork_and_exit(request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE class GenericHandler(grpc.GenericRpcHandler): def service(self, handler_call_details): if handler_call_details.method == FORK_EXIT: - return ForkExitHandler() + return grpc.unary_unary_rpc_method_handler(fork_and_exit) else: return None def run_server(port_queue): - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=10), - options=(('grpc.so_reuseport', 0),)) + server = test_common.test_server() port = server.add_insecure_port('[::]:0') port_queue.put(port) server.add_generic_rpc_handlers((GenericHandler(),)) @@ -78,15 +66,11 @@ def run_server(port_queue): def run_test(args): if args.scenario == SERVER_RAISES_EXCEPTION: - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=1), - options=(('grpc.so_reuseport', 0),)) + server = test_common.test_server() server.start() raise Exception() elif args.scenario == SERVER_DEALLOCATED: - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=1), - options=(('grpc.so_reuseport', 0),)) + server = test_common.test_server() server.start() server.__del__() while server._state.stage != grpc._server._ServerStage.STOPPED: diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py index 0e4591c5e56..47446d65a51 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -60,6 +60,8 @@ def wait(process): class ServerShutdown(unittest.TestCase): + # Currently we shut down a server (if possible) after the Python server + # instance is garbage collected. This behavior may change in the future. def test_deallocated_server_stops(self): process = subprocess.Popen( BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], From 8183fe12992ac36fd99b637422d6bf158e760036 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 17 Dec 2018 18:29:25 -0800 Subject: [PATCH 6/7] fix BUILD.bazel --- src/python/grpcio_tests/tests/unit/BUILD.bazel | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 7a910fd9feb..1b462ec67a6 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -50,6 +50,11 @@ py_library( srcs = ["_exit_scenarios.py"], ) +py_library( + name = "_server_shutdown_scenarios", + srcs = ["_server_shutdown_scenarios.py"], +) + py_library( name = "_thread_pool", srcs = ["_thread_pool.py"], @@ -71,6 +76,7 @@ py_library( ":resources", ":test_common", ":_exit_scenarios", + ":_server_shutdown_scenarios", ":_thread_pool", ":_from_grpc_import_star", "//src/python/grpcio_tests/tests/unit/framework/common", From 3c49252d47f149c8b3262f9a1db83f11340e368b Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 17 Dec 2018 21:05:13 -0800 Subject: [PATCH 7/7] bazel docker image does not support ipv6 --- .../grpcio_tests/tests/unit/_server_shutdown_scenarios.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py index 1797e9bbfec..1d1fdba11ee 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -81,7 +81,7 @@ def run_test(args): thread.daemon = True thread.start() port = port_queue.get() - channel = grpc.insecure_channel('[::]:%d' % port) + channel = grpc.insecure_channel('localhost:%d' % port) multi_callable = channel.unary_unary(FORK_EXIT) result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) os.wait()