From 331fcb9dc314ab4560adada3813b34d3ee33c7eb Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Thu, 19 Feb 2015 22:03:26 +0000 Subject: [PATCH] Add SyncAsync interfaces and implementations. These invocation-side values allow users to choose between synchronous and asynchronous execution of their RPCs with a single attribute access. --- .../grpc/framework/face/implementations.py | 38 ++++++ .../src/grpc/framework/face/interfaces.py | 114 ++++++++++++++++++ ...ing_invocation_inline_service_test_case.py | 6 +- ...on_asynchronous_event_service_test_case.py | 8 +- 4 files changed, 160 insertions(+), 6 deletions(-) diff --git a/src/python/src/grpc/framework/face/implementations.py b/src/python/src/grpc/framework/face/implementations.py index c499b907207..86948b386fa 100644 --- a/src/python/src/grpc/framework/face/implementations.py +++ b/src/python/src/grpc/framework/face/implementations.py @@ -56,6 +56,38 @@ class _BaseServicer(base_interfaces.Servicer): raise _base_exceptions.NoSuchMethodError() +class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync): + + def __init__(self, front, name): + self._front = front + self._name = name + + def __call__(self, request, timeout): + return _calls.blocking_value_in_value_out( + self._front, self._name, request, timeout, 'unused trace ID') + + def async(self, request, timeout): + return _calls.future_value_in_value_out( + self._front, self._name, request, timeout, 'unused trace ID') + + +class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): + + def __init__(self, front, name, pool): + self._front = front + self._name = name + self._pool = pool + + def __call__(self, request_iterator, timeout): + return _calls.blocking_stream_in_value_out( + self._front, self._name, request_iterator, timeout, 'unused trace ID') + + def async(self, request_iterator, timeout): + return _calls.future_stream_in_value_out( + self._front, self._name, request_iterator, timeout, 'unused trace ID', + self._pool) + + class _Server(interfaces.Server): """An interfaces.Server implementation.""" @@ -117,6 +149,12 @@ class _Stub(interfaces.Stub): self._front, name, response_consumer, abortion_callback, timeout, 'unused trace ID') + def unary_unary_sync_async(self, name): + return _UnaryUnarySyncAsync(self._front, name) + + def stream_unary_sync_async(self, name): + return _StreamUnarySyncAsync(self._front, name, self._pool) + def _aggregate_methods( pool, diff --git a/src/python/src/grpc/framework/face/interfaces.py b/src/python/src/grpc/framework/face/interfaces.py index 548e9ce4dbe..9e19106e6f3 100644 --- a/src/python/src/grpc/framework/face/interfaces.py +++ b/src/python/src/grpc/framework/face/interfaces.py @@ -59,6 +59,96 @@ class CancellableIterator(object): raise NotImplementedError() +class UnaryUnarySyncAsync(object): + """Affords invoking a unary-unary RPC synchronously or asynchronously. + + Values implementing this interface are directly callable and present an + "async" method. Both calls take a request value and a numeric timeout. + Direct invocation of a value of this type invokes its associated RPC and + blocks until the RPC's response is available. Calling the "async" method + of a value of this type invokes its associated RPC and immediately returns a + future.Future bound to the asynchronous execution of the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request, timeout): + """Synchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def async(self, request, timeout): + """Asynchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. + """ + raise NotImplementedError() + + +class StreamUnarySyncAsync(object): + """Affords invoking a stream-unary RPC synchronously or asynchronously. + + Values implementing this interface are directly callable and present an + "async" method. Both calls take an iterator of request values and a numeric + timeout. Direct invocation of a value of this type invokes its associated RPC + and blocks until the RPC's response is available. Calling the "async" method + of a value of this type invokes its associated RPC and immediately returns a + future.Future bound to the asynchronous execution of the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request_iterator, timeout): + """Synchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def async(self, request, timeout): + """Asynchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. + """ + raise NotImplementedError() + + @enum.unique class Abortion(enum.Enum): """Categories of RPC abortion.""" @@ -540,3 +630,27 @@ class Stub(object): request values of the RPC should be passed. """ raise NotImplementedError() + + @abc.abstractmethod + def unary_unary_sync_async(self, name): + """Creates a UnaryUnarySyncAsync value for a unary-unary RPC method. + + Args: + name: The RPC method name. + + Returns: + A UnaryUnarySyncAsync value for the named unary-unary RPC method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary_sync_async(self, name): + """Creates a StreamUnarySyncAsync value for a stream-unary RPC method. + + Args: + name: The RPC method name. + + Returns: + A StreamUnarySyncAsync value for the named stream-unary RPC method. + """ + raise NotImplementedError() diff --git a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py index 993098f4ae5..30ff4a32900 100644 --- a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -146,7 +146,8 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): - self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + sync_async = self.stub.unary_unary_sync_async(name) + sync_async(request, _TIMEOUT) def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -168,7 +169,8 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): - self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + sync_async = self.stub.stream_unary_sync_async(name) + sync_async(iter(requests), _TIMEOUT) def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index 42db3050e1f..c87846f2ef6 100644 --- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -190,8 +190,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( request = test_messages.request() with self.control.pause(): - response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + sync_async = self.stub.unary_unary_sync_async(name) + response_future = sync_async.async(request, _TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): @@ -216,8 +216,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( requests = test_messages.requests() with self.control.pause(): - response_future = self.stub.future_stream_in_value_out( - name, iter(requests), _TIMEOUT) + sync_async = self.stub.stream_unary_sync_async(name) + response_future = sync_async.async(iter(requests), _TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError):