Merge pull request #627 from nathanielmanistaatgoogle/sync-async

pull/665/head
Nathaniel Manista 10 years ago
commit 477ce9fd9a
  1. 38
      src/python/src/grpc/framework/face/implementations.py
  2. 114
      src/python/src/grpc/framework/face/interfaces.py
  3. 6
      src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
  4. 8
      src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py

@ -56,6 +56,38 @@ class _BaseServicer(base_interfaces.Servicer):
raise _base_exceptions.NoSuchMethodError()
class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
def __init__(self, front, name):
self._front = front
self._name = name
def __call__(self, request, timeout):
return _calls.blocking_value_in_value_out(
self._front, self._name, request, timeout, 'unused trace ID')
def async(self, request, timeout):
return _calls.future_value_in_value_out(
self._front, self._name, request, timeout, 'unused trace ID')
class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
def __init__(self, front, name, pool):
self._front = front
self._name = name
self._pool = pool
def __call__(self, request_iterator, timeout):
return _calls.blocking_stream_in_value_out(
self._front, self._name, request_iterator, timeout, 'unused trace ID')
def async(self, request_iterator, timeout):
return _calls.future_stream_in_value_out(
self._front, self._name, request_iterator, timeout, 'unused trace ID',
self._pool)
class _Server(interfaces.Server):
"""An interfaces.Server implementation."""
@ -117,6 +149,12 @@ class _Stub(interfaces.Stub):
self._front, name, response_consumer, abortion_callback, timeout,
'unused trace ID')
def unary_unary_sync_async(self, name):
return _UnaryUnarySyncAsync(self._front, name)
def stream_unary_sync_async(self, name):
return _StreamUnarySyncAsync(self._front, name, self._pool)
def _aggregate_methods(
pool,

@ -59,6 +59,96 @@ class CancellableIterator(object):
raise NotImplementedError()
class UnaryUnarySyncAsync(object):
"""Affords invoking a unary-unary RPC synchronously or asynchronously.
Values implementing this interface are directly callable and present an
"async" method. Both calls take a request value and a numeric timeout.
Direct invocation of a value of this type invokes its associated RPC and
blocks until the RPC's response is available. Calling the "async" method
of a value of this type invokes its associated RPC and immediately returns a
future.Future bound to the asynchronous execution of the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def async(self, request, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future's result value will be the response value of the RPC.
In the event of RPC abortion, the returned Future's exception value
will be an exceptions.RpcError.
"""
raise NotImplementedError()
class StreamUnarySyncAsync(object):
"""Affords invoking a stream-unary RPC synchronously or asynchronously.
Values implementing this interface are directly callable and present an
"async" method. Both calls take an iterator of request values and a numeric
timeout. Direct invocation of a value of this type invokes its associated RPC
and blocks until the RPC's response is available. Calling the "async" method
of a value of this type invokes its associated RPC and immediately returns a
future.Future bound to the asynchronous execution of the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request_iterator, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def async(self, request, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future's result value will be the response value of the RPC.
In the event of RPC abortion, the returned Future's exception value
will be an exceptions.RpcError.
"""
raise NotImplementedError()
@enum.unique
class Abortion(enum.Enum):
"""Categories of RPC abortion."""
@ -540,3 +630,27 @@ class Stub(object):
request values of the RPC should be passed.
"""
raise NotImplementedError()
@abc.abstractmethod
def unary_unary_sync_async(self, name):
"""Creates a UnaryUnarySyncAsync value for a unary-unary RPC method.
Args:
name: The RPC method name.
Returns:
A UnaryUnarySyncAsync value for the named unary-unary RPC method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_unary_sync_async(self, name):
"""Creates a StreamUnarySyncAsync value for a stream-unary RPC method.
Args:
name: The RPC method name.
Returns:
A StreamUnarySyncAsync value for the named stream-unary RPC method.
"""
raise NotImplementedError()

@ -147,7 +147,8 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
sync_async = self.stub.unary_unary_sync_async(name)
sync_async(request, _TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -169,7 +170,8 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
sync_async = self.stub.stream_unary_sync_async(name)
sync_async(iter(requests), _TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (

@ -190,8 +190,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
with self.control.pause():
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
sync_async = self.stub.unary_unary_sync_async(name)
response_future = sync_async.async(request, _TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@ -216,8 +216,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
requests = test_messages.requests()
with self.control.pause():
response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT)
sync_async = self.stub.stream_unary_sync_async(name)
response_future = sync_async.async(iter(requests), _TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):

Loading…
Cancel
Save