From 4a9b1c69880db4fa8b41d14c9b91870332e07746 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Thu, 21 Jan 2016 22:26:05 +0000 Subject: [PATCH] Fix a defect in RPC Framework Core On the service-side, an operation isn't successfully completed with just the conclusion of transmission to the other side; local ingestion of the status and code must be completed as well before termination callbacks are called. --- .../grpc/framework/core/_termination.py | 6 +++--- .../unit/framework/common/test_constants.py | 5 ++++- ...e_invocation_asynchronous_event_service.py | 20 ++++++++++++++++++- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py index bdb9147e5b2..364158b2b87 100644 --- a/src/python/grpcio/grpc/framework/core/_termination.py +++ b/src/python/grpcio/grpc/framework/core/_termination.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -46,8 +46,8 @@ def _invocation_completion_predicate( def _service_completion_predicate( unused_emission_complete, transmission_complete, unused_reception_complete, - unused_ingestion_complete): - return transmission_complete + ingestion_complete): + return transmission_complete and ingestion_complete class TerminationManager(_interfaces.TerminationManager): diff --git a/src/python/grpcio/tests/unit/framework/common/test_constants.py b/src/python/grpcio/tests/unit/framework/common/test_constants.py index e1d3c2709d0..9f1fb8471ca 100644 --- a/src/python/grpcio/tests/unit/framework/common/test_constants.py +++ b/src/python/grpcio/tests/unit/framework/common/test_constants.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -49,5 +49,8 @@ STREAM_LENGTH = 200 # The size of payloads to transmit in tests. PAYLOAD_SIZE = 256 * 1024 + 17 +# The parallelism to use in tests of parallel RPCs. +PARALLELISM = 200 + # The size of thread pools to use in tests. POOL_SIZE = 10 diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index c178f2f1083..fc8daa992fa 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -219,6 +219,24 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() for response_future in response_futures] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + def testParallelInvocations(self): for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()):