diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index d07a7a721d7..b3eeaad1f73 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -436,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): """Affords invoking a unary-unary RPC.""" @abc.abstractmethod - def __call__( - self, request, timeout=None, metadata=None, credentials=None, - with_call=False): + def __call__(self, request, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. Args: @@ -447,12 +445,30 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: An optional sequence of pairs of bytes to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. - with_call: Whether or not to include return a Call for the RPC in addition - to the response. Returns: - The response value for the RPC, and a Call for the RPC if with_call was - set to True at invocation. + The response value for the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + + @abc.abstractmethod + def with_call(self, request, timeout=None, metadata=None, credentials=None): + """Synchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional durating of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. + + Returns: + The response value for the RPC and a Call value for the RPC. Raises: RpcError: Indicating that the RPC terminated with non-OK status. The @@ -508,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): @abc.abstractmethod def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None, - with_call=False): + self, request_iterator, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. Args: @@ -518,8 +533,6 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: An optional sequence of pairs of bytes to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. - with_call: Whether or not to include return a Call for the RPC in addition - to the response. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -532,6 +545,28 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): """ raise NotImplementedError() + @abc.abstractmethod + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None): + """Synchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. + + Returns: + The response value for the RPC and a Call for the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + @abc.abstractmethod def future( self, request_iterator, timeout=None, metadata=None, credentials=None): diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 7cdd542de27..1f34beeb2cd 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -449,9 +449,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ) return state, operations, deadline, deadline_timespec, None - def __call__( - self, request, timeout=None, metadata=None, credentials=None, - with_call=False): + def _blocking(self, request, timeout, metadata, credentials): state, operations, deadline, deadline_timespec, rendezvous = self._prepare( request, timeout, metadata) if rendezvous: @@ -464,7 +462,15 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): call.set_credentials(credentials._credentials) call.start_batch(cygrpc.Operations(operations), None) _handle_event(completion_queue.poll(), state, self._response_deserializer) - return _end_unary_response_blocking(state, with_call, deadline) + return state, deadline + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, False, deadline) + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, True, deadline) def future(self, request, timeout=None, metadata=None, credentials=None): state, operations, deadline, deadline_timespec, rendezvous = self._prepare( @@ -532,9 +538,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None, - with_call=False): + def _blocking(self, request_iterator, timeout, metadata, credentials): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) completion_queue = cygrpc.CompletionQueue() @@ -563,7 +567,19 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): state.condition.notify_all() if not state.due: break - return _end_unary_response_blocking(state, with_call, deadline) + return state, deadline + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking( + request_iterator, timeout, metadata, credentials) + return _end_unary_response_blocking(state, False, deadline) + + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking( + request_iterator, timeout, metadata, credentials) + return _end_unary_response_blocking(state, True, deadline) def future( self, request_iterator, timeout=None, metadata=None, credentials=None): diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index 024808c5404..56456cc117f 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -186,9 +186,9 @@ def _blocking_unary_unary( response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) if with_call: - response, call = multi_callable( + response, call = multi_callable.with_call( request, timeout=timeout, metadata=effective_metadata, - credentials=_credentials(protocol_options), with_call=True) + credentials=_credentials(protocol_options)) return response, _Rendezvous(None, None, call) else: return multi_callable( @@ -237,9 +237,9 @@ def _blocking_stream_unary( response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) if with_call: - response, call = multi_callable( + response, call = multi_callable.with_call( request_iterator, timeout=timeout, metadata=effective_metadata, - credentials=_credentials(protocol_options), with_call=True) + credentials=_credentials(protocol_options)) return response, _Rendezvous(None, None, call) else: return multi_callable( diff --git a/src/python/grpcio/tests/unit/_metadata_test.py b/src/python/grpcio/tests/unit/_metadata_test.py index 77b39012619..2cb13f236bd 100644 --- a/src/python/grpcio/tests/unit/_metadata_test.py +++ b/src/python/grpcio/tests/unit/_metadata_test.py @@ -173,8 +173,8 @@ class MetadataTest(unittest.TestCase): def testUnaryUnary(self): multi_callable = self._channel.unary_unary(_UNARY_UNARY) - unused_response, call = multi_callable( - _REQUEST, metadata=_CLIENT_METADATA, with_call=True) + unused_response, call = multi_callable.with_call( + _REQUEST, metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) self.assertTrue(test_common.metadata_transmitted( @@ -192,9 +192,9 @@ class MetadataTest(unittest.TestCase): def testStreamUnary(self): multi_callable = self._channel.stream_unary(_STREAM_UNARY) - unused_response, call = multi_callable( + unused_response, call = multi_callable.with_call( [_REQUEST] * test_constants.STREAM_LENGTH, - metadata=_CLIENT_METADATA, with_call=True) + metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) self.assertTrue(test_common.metadata_transmitted( diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py index 8407593c86d..9814504edff 100644 --- a/src/python/grpcio/tests/unit/_rpc_test.py +++ b/src/python/grpcio/tests/unit/_rpc_test.py @@ -27,7 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Test of gRPC Python's application-layer API.""" +"""Test of RPCs made against gRPC Python's application-layer API.""" import itertools import threading @@ -216,10 +216,9 @@ class RPCTest(unittest.TestCase): expected_response = self._handler.handle_unary_unary(request, None) multi_callable = _unary_unary_multi_callable(self._channel) - response, call = multi_callable( + response, call = multi_callable.with_call( request, metadata=( - (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),), - with_call=True) + (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) @@ -266,11 +265,11 @@ class RPCTest(unittest.TestCase): request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) - response, call = multi_callable( + response, call = multi_callable.with_call( request_iterator, metadata=( (b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'), - ), with_call=True) + )) self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) @@ -525,10 +524,9 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(self._channel) with self._control.pause(): with self.assertRaises(grpc.RpcError) as exception_context: - multi_callable( + multi_callable.with_call( request, timeout=test_constants.SHORT_TIMEOUT, - metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),), - with_call=True) + metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),)) self.assertIsNotNone(exception_context.exception.initial_metadata()) self.assertIs( @@ -640,10 +638,9 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(self._channel) with self._control.fail(): with self.assertRaises(grpc.RpcError) as exception_context: - multi_callable( + multi_callable.with_call( request, - metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),), - with_call=True) + metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),)) self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code())