diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index 367effdb1a3..653a5ac58c6 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -66,8 +66,8 @@ class _ServicerMethods(object): def __init__(self, test_pb2, delay): self._paused = False self._failed = False - self.test_pb2 = test_pb2 - self.delay = delay + self._test_pb2 = test_pb2 + self._delay = delay @contextlib.contextmanager def pause(self): # pylint: disable=invalid-name @@ -84,27 +84,27 @@ class _ServicerMethods(object): def _control(self): # pylint: disable=invalid-name if self._failed: raise ValueError() - time.sleep(self.delay) + time.sleep(self._delay) while self._paused: time.sleep(0) - def UnaryCall(self, request, unused_context): - response = self.test_pb2.SimpleResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + def UnaryCall(self, request, unused_rpc_context): + response = self._test_pb2.SimpleResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * request.response_size self._control() return response - def StreamingOutputCall(self, request, unused_context): + def StreamingOutputCall(self, request, unused_rpc_context): for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response - def StreamingInputCall(self, request_iter, unused_context): - response = self.test_pb2.StreamingInputCallResponse() + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._test_pb2.StreamingInputCallResponse() aggregated_payload_size = 0 for request in request_iter: aggregated_payload_size += len(request.payload.payload_compressable) @@ -112,21 +112,21 @@ class _ServicerMethods(object): self._control() return response - def FullDuplexCall(self, request_iter, unused_context): + def FullDuplexCall(self, request_iter, unused_rpc_context): for request in request_iter: for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response - def HalfDuplexCall(self, request_iter, unused_context): + def HalfDuplexCall(self, request_iter, unused_rpc_context): responses = [] for request in request_iter: for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() responses.append(response) @@ -152,7 +152,7 @@ def _CreateService(test_pb2, delay): timeout: how long the stub will wait for the servicer by default. Yields: - A three-tuple (servicer_methods, servicer, stub), where the servicer is + A (servicer_methods, servicer, stub) three-tuple where servicer_methods is the back-end of the service bound to the stub and the server and stub are both activated and ready for use. """ @@ -185,7 +185,7 @@ def _CreateService(test_pb2, delay): yield servicer_methods, stub, server -def StreamingInputRequest(test_pb2): +def _streaming_input_request_iterator(test_pb2): for _ in range(3): request = test_pb2.StreamingInputCallRequest() request.payload.payload_type = test_pb2.COMPRESSABLE @@ -193,7 +193,7 @@ def StreamingInputRequest(test_pb2): yield request -def StreamingOutputRequest(test_pb2): +def _streaming_output_request(test_pb2): request = test_pb2.StreamingOutputCallRequest() sizes = [1, 2, 3] request.response_parameters.add(size=sizes[0], interval_us=0) @@ -202,7 +202,7 @@ def StreamingOutputRequest(test_pb2): return request -def FullDuplexRequest(test_pb2): +def _full_duplex_request_iterator(test_pb2): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request @@ -270,32 +270,32 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) response = stub.UnaryCall(request, NORMAL_TIMEOUT) - expected_response = servicer.UnaryCall(request, None) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) with _CreateService(test_pb2, LONG_DELAY) as ( - servicer, stub, unused_server): + methods, stub, unused_server): start_time = time.clock() response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) # Check that we didn't block on the asynchronous call. self.assertGreater(LONG_DELAY, time.clock() - start_time) response = response_future.result() - expected_response = servicer.UnaryCall(request, None) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top # set the timeout super low... with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): + methods, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) - with servicer.pause(): + with methods.pause(): response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): response_future.result() @@ -306,8 +306,8 @@ class PythonPluginTest(unittest.TestCase): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + methods, stub, unused_server): + with methods.pause(): response_future = stub.UnaryCall.async(request, 1) response_future.cancel() self.assertTrue(response_future.cancelled()) @@ -316,29 +316,30 @@ class PythonPluginTest(unittest.TestCase): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + methods, stub, unused_server): + with methods.fail(): response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testStreamingOutputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT) - expected_responses = servicer.StreamingOutputCall(request, None) - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check + expected_responses = methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' 'forever and fix.') def testStreamingOutputCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) + request = _streaming_output_request(test_pb2) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + methods, stub, unused_server): + with methods.pause(): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(responses) @@ -347,9 +348,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testStreamingOutputCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) + request = _streaming_output_request(test_pb2) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - unused_servicer, stub, unused_server): + unused_methods, stub, unused_server): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) next(responses) responses.cancel() @@ -360,10 +361,10 @@ class PythonPluginTest(unittest.TestCase): 'instead of raising the proper error.') def testStreamingOutputCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) + request = _streaming_output_request(test_pb2) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + methods, stub, unused_server): + with methods.fail(): responses = stub.StreamingOutputCall(request, 1) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): @@ -373,34 +374,34 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testStreamingInputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): response = stub.StreamingInputCall(StreamingInputRequest(test_pb2), NORMAL_TIMEOUT) - expected_response = servicer.StreamingInputCall( - StreamingInputRequest(test_pb2), None) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, LONG_DELAY) as ( - servicer, stub, unused_server): + methods, stub, unused_server): start_time = time.clock() response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), LONG_TIMEOUT) + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) self.assertGreater(LONG_DELAY, time.clock() - start_time) response = response_future.result() - expected_response = servicer.StreamingInputCall( - StreamingInputRequest(test_pb2), None) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top # set the timeout super low... with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + methods, stub, unused_server): + with methods.pause(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), SHORT_TIMEOUT) + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): response_future.result() self.assertIsInstance( @@ -409,10 +410,10 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + methods, stub, unused_server): + with methods.pause(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), NORMAL_TIMEOUT) + _streaming_input_request_iterator(test_pb2), NORMAL_TIMEOUT) response_future.cancel() self.assertTrue(response_future.cancelled()) with self.assertRaises(future.CancelledError): @@ -421,32 +422,32 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + methods, stub, unused_server): + with methods.fail(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), SHORT_TIMEOUT) + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testFullDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2), - NORMAL_TIMEOUT) - expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2), - None) - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), NORMAL_TIMEOUT) + expected_responses = methods.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' 'forever and fix.') def testFullDuplexCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = FullDuplexRequest(test_pb2) + request_iterator = _full_duplex_request_iterator(test_pb2) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): - responses = stub.FullDuplexCall(request, SHORT_TIMEOUT) + methods, stub, unused_server): + with methods.pause(): + responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(responses) @@ -454,9 +455,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testFullDuplexCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - request = FullDuplexRequest(test_pb2) - responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + request_iterator = _full_duplex_request_iterator(test_pb2) + responses = stub.FullDuplexCall(request_iterator, NORMAL_TIMEOUT) next(responses) responses.cancel() with self.assertRaises(future.CancelledError): @@ -466,11 +467,11 @@ class PythonPluginTest(unittest.TestCase): 'and fix.') def testFullDuplexCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = FullDuplexRequest(test_pb2) + request_iterator = _full_duplex_request_iterator(test_pb2) with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): - responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT) + methods, stub, unused_server): + with methods.fail(): + responses = stub.FullDuplexCall(request_iterator, NORMAL_TIMEOUT) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): next(responses) @@ -480,8 +481,8 @@ class PythonPluginTest(unittest.TestCase): def testHalfDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - def HalfDuplexRequest(): + methods, stub, unused_server): + def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request @@ -489,30 +490,33 @@ class PythonPluginTest(unittest.TestCase): request.response_parameters.add(size=2, interval_us=0) request.response_parameters.add(size=3, interval_us=0) yield request - responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT) - expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest(), None) + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), NORMAL_TIMEOUT) + expected_responses = methods.HalfDuplexCall( + HalfDuplexRequest(), 'not a real RpcContext!') for check in itertools.izip_longest(expected_responses, responses): expected_response, response = check self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): import test_pb2 # pylint: disable=g-import-not-at-top - wait_flag = [False] + wait_cell = [False] @contextlib.contextmanager def wait(): # pylint: disable=invalid-name # Where's Python 3's 'nonlocal' statement when you need it? - wait_flag[0] = True + wait_cell[0] = True yield - wait_flag[0] = False - def HalfDuplexRequest(): + wait_cell[0] = False + def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - while wait_flag[0]: + while wait_cell[0]: time.sleep(0.1) - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): with wait(): - responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT) + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), NORMAL_TIMEOUT) # half-duplex waits for the client to send all info with self.assertRaises(exceptions.ExpirationError): next(responses)