pull/17444/head
Eric Gribkoff 6 years ago
parent 082b63e095
commit a76d72e0a6
  1. 1
      src/python/grpcio_tests/tests/tests.json
  2. 113
      src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py
  3. 87
      src/python/grpcio_tests/tests/unit/_server_shutdown_test.py

@ -57,6 +57,7 @@
"unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest",
"unit._rpc_test.RPCTest",
"unit._server_shutdown_test.ServerShutdown",
"unit._server_ssl_cert_config_test.ServerSSLCertConfigFetcherParamsChecks",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",

@ -0,0 +1,113 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines a number of module-scope gRPC scenarios to test server shutdown."""
import argparse
import os
import threading
import time
import logging
import grpc
from concurrent import futures
from six.moves import queue
WAIT_TIME = 1000
REQUEST = b'request'
RESPONSE = b'response'
SERVER_RAISES_EXCEPTION = 'server_raises_exception'
SERVER_DEALLOCATED = 'server_deallocated'
SERVER_FORK_CAN_EXIT = 'server_fork_can_exit'
FORK_EXIT = '/test/ForkExit'
class ForkExitHandler(object):
def unary_unary(self, request, servicer_context):
pid = os.fork()
if pid == 0:
os._exit(0)
return RESPONSE
def __init__(self):
self.request_streaming = None
self.response_streaming = None
self.request_deserializer = None
self.response_serializer = None
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
class GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == FORK_EXIT:
return ForkExitHandler()
else:
return None
def run_server(port_queue,):
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
port_queue.put(port)
server.add_generic_rpc_handlers((GenericHandler(),))
server.start()
# threading.Event.wait() does not exhibit the bug identified in
# https://github.com/grpc/grpc/issues/17093, sleep instead
time.sleep(WAIT_TIME)
def run_test(args):
if args.scenario == SERVER_RAISES_EXCEPTION:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(('grpc.so_reuseport', 0),))
server.start()
raise Exception()
elif args.scenario == SERVER_DEALLOCATED:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(('grpc.so_reuseport', 0),))
server.start()
server.__del__()
while server._state.stage != grpc._server._ServerStage.STOPPED:
pass
elif args.scenario == SERVER_FORK_CAN_EXIT:
port_queue = queue.Queue()
thread = threading.Thread(target=run_server, args=(port_queue,))
thread.daemon = True
thread.start()
port = port_queue.get()
channel = grpc.insecure_channel('[::]:%d' % port)
multi_callable = channel.unary_unary(FORK_EXIT)
result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
os.wait()
else:
raise ValueError('unknown test scenario')
if __name__ == '__main__':
logging.basicConfig()
parser = argparse.ArgumentParser()
parser.add_argument('scenario', type=str)
args = parser.parse_args()
run_test(args)

@ -0,0 +1,87 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests clean shutdown of server on various interpreter exit conditions.
The tests in this module spawn a subprocess for each test case, the
test is considered successful if it doesn't hang/timeout.
"""
import atexit
import os
import subprocess
import sys
import threading
import unittest
import logging
from tests.unit import _server_shutdown_scenarios
SCENARIO_FILE = os.path.abspath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'_server_shutdown_scenarios.py'))
INTERPRETER = sys.executable
BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
processes = []
process_lock = threading.Lock()
# Make sure we attempt to clean up any
# processes we may have left running
def cleanup_processes():
with process_lock:
for process in processes:
try:
process.kill()
except Exception: # pylint: disable=broad-except
pass
atexit.register(cleanup_processes)
def wait(process):
with process_lock:
processes.append(process)
process.wait()
class ServerShutdown(unittest.TestCase):
def test_deallocated_server_stops(self):
process = subprocess.Popen(
BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED],
stdout=sys.stdout,
stderr=sys.stderr)
wait(process)
def test_server_exception_exits(self):
process = subprocess.Popen(
BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION],
stdout=sys.stdout,
stderr=sys.stderr)
wait(process)
def test_server_fork_can_exit(self):
process = subprocess.Popen(
BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT],
stdout=sys.stdout,
stderr=sys.stderr)
wait(process)
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)
Loading…
Cancel
Save