diff --git a/src/python/interop/interop/_interop_test_case.py b/src/python/interop/interop/_interop_test_case.py index cd6a574e90d..f40ef0ec838 100644 --- a/src/python/interop/interop/_interop_test_case.py +++ b/src/python/interop/interop/_interop_test_case.py @@ -53,3 +53,9 @@ class InteropTestCase(object): def testPingPong(self): methods.TestCase.PING_PONG.test_interoperability(self.stub, None) + + def testCancelAfterBegin(self): + methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability(self.stub, None) + + def testCancelAfterFirstResponse(self): + methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None) diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py index 909b738bd10..194afadb17f 100644 --- a/src/python/interop/interop/methods.py +++ b/src/python/interop/interop/methods.py @@ -219,6 +219,17 @@ def _server_streaming(stub): raise ValueError( 'response body of invalid size %d!' % len(response.payload.body)) +def _cancel_after_begin(stub): + with stub: + sizes = (27182, 8, 1828, 45904) + payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes] + requests = [messages_pb2.StreamingInputCallRequest(payload=payload) + for payload in payloads] + responses = stub.StreamingInputCall.async(requests, _TIMEOUT) + responses.cancel() + if not responses.cancelled(): + raise ValueError('expected call to be cancelled') + class _Pipe(object): @@ -249,13 +260,18 @@ class _Pipe(object): self._open = False self._condition.notify() + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + def _ping_pong(stub): request_response_sizes = (31415, 9, 2653, 58979) request_payload_sizes = (27182, 8, 1828, 45904) - with stub: - pipe = _Pipe() + with stub, _Pipe() as pipe: response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) print 'Starting ping-pong with response iterator %s' % response_iterator for response_size, payload_size in zip( @@ -273,7 +289,33 @@ def _ping_pong(stub): if len(response.payload.body) != response_size: raise ValueError( 'response body of invalid size %d!' % len(response.payload.body)) - pipe.close() + + +def _cancel_after_first_response(stub): + request_response_sizes = (31415, 9, 2653, 58979) + request_payload_sizes = (27182, 8, 1828, 45904) + with stub, _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) + + response_size = request_response_sizes[0] + payload_size = request_payload_sizes[0] + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters( + size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + response = next(response_iterator) + # We test the contents of `response` in the Ping Pong test - don't check + # them here. + response_iterator.cancel() + + try: + next(response_iterator) + except Exception: + pass + else: + raise ValueError('expected call to be cancelled') def _compute_engine_creds(stub, args): @@ -305,6 +347,8 @@ class TestCase(enum.Enum): SERVER_STREAMING = 'server_streaming' CLIENT_STREAMING = 'client_streaming' PING_PONG = 'ping_pong' + CANCEL_AFTER_BEGIN = 'cancel_after_begin' + CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' SERVICE_ACCOUNT_CREDS = 'service_account_creds' @@ -319,6 +363,10 @@ class TestCase(enum.Enum): _client_streaming(stub) elif self is TestCase.PING_PONG: _ping_pong(stub) + elif self is TestCase.CANCEL_AFTER_BEGIN: + _cancel_after_begin(stub) + elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: + _cancel_after_first_response(stub) elif self is TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds(stub, args) elif self is TestCase.SERVICE_ACCOUNT_CREDS: