From 5ee11a8e1263eb00f1ddda0cd00570d9dd1681a0 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Thu, 11 Jun 2015 15:50:13 +0000 Subject: [PATCH] Stability fixes for python_plugin_test The "normal" timeout is eliminated. The "short" timeout is changed to be the length used in tests that will time out as part of their execution and the "long" timeout is changed to be a ridiculously high value that will have no bearing on passing tests. The "pause" behavior of _ServicerMethods is changed to use a threading.Condition's wait/notify methods rather than busy-sleeping. Tests that used servicer delay to verify that asynchronous calls are not affected by server delay are changed to use servicer pause to verify that asynchronous calls return while the servicer is paused. Busy-sleeping in testHalfDuplexCallWedged is replaced with use of a threading.Condition's wait/notify methods. Fixes https://github.com/grpc/grpc/issues/1900. --- test/compiler/python_plugin_test.py | 132 +++++++++++++++------------- 1 file changed, 71 insertions(+), 61 deletions(-) diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index 653a5ac58c6..0e58d912b9a 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -36,6 +36,7 @@ import shutil import subprocess import sys import tempfile +import threading import time import unittest @@ -49,13 +50,13 @@ STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' -# Timeouts and delays. -SHORT_TIMEOUT = 0.1 -NORMAL_TIMEOUT = 1 -LONG_TIMEOUT = 2 -DOES_NOT_MATTER_DELAY = 0 +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 NO_DELAY = 0 -LONG_DELAY = 1 # Build mode environment variable set by tools/run_tests/run_tests.py. _build_mode = os.environ['CONFIG'] @@ -64,29 +65,36 @@ _build_mode = os.environ['CONFIG'] class _ServicerMethods(object): def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay self._paused = False - self._failed = False + self._fail = False self._test_pb2 = test_pb2 - self._delay = delay @contextlib.contextmanager def pause(self): # pylint: disable=invalid-name - self._paused = True + with self._condition: + self._paused = True yield - self._paused = False + with self._condition: + self._paused = False + self._condition.notify_all() @contextlib.contextmanager def fail(self): # pylint: disable=invalid-name - self._failed = True + with self._condition: + self._fail = True yield - self._failed = False + with self._condition: + self._fail = False def _control(self): # pylint: disable=invalid-name - if self._failed: - raise ValueError() + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() time.sleep(self._delay) - while self._paused: - time.sleep(0) def UnaryCall(self, request, unused_rpc_context): response = self._test_pb2.SimpleResponse() @@ -147,9 +155,8 @@ def _CreateService(test_pb2, delay): waiting for the service. Args: - test_pb2: the test_pb2 module generated by this test - delay: delay in seconds per response from the servicer - timeout: how long the stub will wait for the servicer by default. + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. Yields: A (servicer_methods, servicer, stub) three-tuple where servicer_methods is @@ -250,7 +257,7 @@ class PythonPluginTest(unittest.TestCase): if exc.errno != errno.ENOENT: raise - # TODO(atash): Figure out which of theses tests is hanging flakily with small + # TODO(atash): Figure out which of these tests is hanging flakily with small # probability. def testImportAttributes(self): @@ -265,34 +272,33 @@ class PythonPluginTest(unittest.TestCase): def testUpDown(self): import test_pb2 with _CreateService( - test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server): + test_pb2, NO_DELAY) as (servicer, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) def testUnaryCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. request = test_pb2.SimpleRequest(response_size=13) - response = stub.UnaryCall(request, NORMAL_TIMEOUT) + response = stub.UnaryCall(request, timeout) expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, LONG_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): - start_time = time.clock() - response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) - # Check that we didn't block on the asynchronous call. - self.assertGreater(LONG_DELAY, time.clock() - start_time) + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) response = response_future.result() expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) with methods.pause(): @@ -305,7 +311,7 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): response_future = stub.UnaryCall.async(request, 1) @@ -315,17 +321,17 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): - response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT) + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testStreamingOutputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT) + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) expected_responses = methods.StreamingOutputCall( request, 'not a real RpcContext!') for expected_response, response in itertools.izip_longest( @@ -337,7 +343,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingOutputCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) @@ -349,7 +355,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingOutputCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( unused_methods, stub, unused_server): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) next(responses) @@ -362,7 +368,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingOutputCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): responses = stub.StreamingOutputCall(request, 1) @@ -375,20 +381,19 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - response = stub.StreamingInputCall(StreamingInputRequest(test_pb2), - NORMAL_TIMEOUT) + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) expected_response = methods.StreamingInputCall( _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, LONG_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): - start_time = time.clock() - response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) - self.assertGreater(LONG_DELAY, time.clock() - start_time) + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) response = response_future.result() expected_response = methods.StreamingInputCall( _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') @@ -396,8 +401,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): response_future = stub.StreamingInputCall.async( @@ -409,11 +413,12 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), NORMAL_TIMEOUT) + _streaming_input_request_iterator(test_pb2), timeout) response_future.cancel() self.assertTrue(response_future.cancelled()) with self.assertRaises(future.CancelledError): @@ -421,7 +426,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): response_future = stub.StreamingInputCall.async( @@ -432,7 +437,7 @@ class PythonPluginTest(unittest.TestCase): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): responses = stub.FullDuplexCall( - _full_duplex_request_iterator(test_pb2), NORMAL_TIMEOUT) + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) expected_responses = methods.FullDuplexCall( _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') for expected_response, response in itertools.izip_longest( @@ -444,7 +449,7 @@ class PythonPluginTest(unittest.TestCase): def testFullDuplexCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top request_iterator = _full_duplex_request_iterator(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) @@ -457,7 +462,7 @@ class PythonPluginTest(unittest.TestCase): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): request_iterator = _full_duplex_request_iterator(test_pb2) - responses = stub.FullDuplexCall(request_iterator, NORMAL_TIMEOUT) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) next(responses) responses.cancel() with self.assertRaises(future.CancelledError): @@ -468,10 +473,10 @@ class PythonPluginTest(unittest.TestCase): def testFullDuplexCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request_iterator = _full_duplex_request_iterator(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): - responses = stub.FullDuplexCall(request_iterator, NORMAL_TIMEOUT) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): next(responses) @@ -480,7 +485,7 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testHalfDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() @@ -491,32 +496,37 @@ class PythonPluginTest(unittest.TestCase): request.response_parameters.add(size=3, interval_us=0) yield request responses = stub.HalfDuplexCall( - half_duplex_request_iterator(), NORMAL_TIMEOUT) + half_duplex_request_iterator(), LONG_TIMEOUT) expected_responses = methods.HalfDuplexCall( - HalfDuplexRequest(), 'not a real RpcContext!') + half_duplex_request_iterator(), 'not a real RpcContext!') for check in itertools.izip_longest(expected_responses, responses): expected_response, response = check self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): import test_pb2 # pylint: disable=g-import-not-at-top + condition = threading.Condition() wait_cell = [False] @contextlib.contextmanager def wait(): # pylint: disable=invalid-name # Where's Python 3's 'nonlocal' statement when you need it? - wait_cell[0] = True + with condition: + wait_cell[0] = True yield - wait_cell[0] = False + with condition: + wait_cell[0] = False + condition.notify_all() def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - while wait_cell[0]: - time.sleep(0.1) + with condition: + while wait_cell[0]: + condition.wait() with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): with wait(): responses = stub.HalfDuplexCall( - half_duplex_request_iterator(), NORMAL_TIMEOUT) + half_duplex_request_iterator(), SHORT_TIMEOUT) # half-duplex waits for the client to send all info with self.assertRaises(exceptions.ExpirationError): next(responses)