Refactoring within Python RPC Framework

The assembly and face layers were mostly redundant except that the
assembly layer had far-better interfaces and the face layer had more
of a reason to exist. Now they are merged.
pull/979/head
Nathaniel Manista 10 years ago
parent 3631e82c89
commit 5c87a30c41
  1. 30
      src/python/src/grpc/_adapter/_face_test_case.py
  2. 41
      src/python/src/grpc/early_adopter/_face_utilities.py
  3. 47
      src/python/src/grpc/early_adopter/_reexport.py
  4. 11
      src/python/src/grpc/early_adopter/implementations.py
  5. 111
      src/python/src/grpc/framework/assembly/implementations.py
  6. 28
      src/python/src/grpc/framework/assembly/implementations_test.py
  7. 56
      src/python/src/grpc/framework/assembly/interfaces.py
  8. 179
      src/python/src/grpc/framework/assembly/utilities.py
  9. 18
      src/python/src/grpc/framework/face/_service.py
  10. 30
      src/python/src/grpc/framework/face/_test_case.py
  11. 268
      src/python/src/grpc/framework/face/implementations.py
  12. 460
      src/python/src/grpc/framework/face/interfaces.py
  13. 16
      src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
  14. 122
      src/python/src/grpc/framework/face/testing/digest.py
  15. 9
      src/python/src/grpc/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
  16. 17
      src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
  17. 38
      src/python/src/grpc/framework/face/testing/service.py
  18. 8
      src/python/src/grpc/framework/face/testing/stock_service.py
  19. 53
      src/python/src/grpc/framework/face/testing/test_case.py
  20. 174
      src/python/src/grpc/framework/face/utilities.py

@ -50,31 +50,12 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
"""Provides abstract Face-layer tests a GRPC-backed implementation."""
def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
self, name, methods, method_implementations,
multi_method_implementation):
pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
servicer = face_implementations.servicer(
pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
pool, method_implementations, multi_method_implementation)
serialization = serial.serialization(methods)
@ -96,9 +77,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
rear_link.join_fore_link(front)
front.join_rear_link(rear_link)
server = face_implementations.server()
stub = face_implementations.stub(front, pool)
return server, stub, (rear_link, fore_link, front, back)
stub = face_implementations.generic_stub(front, pool)
return stub, (rear_link, fore_link, front, back)
def tear_down_implementation(self, memo):
rear_link, fore_link, front, back = memo

@ -30,23 +30,20 @@
import abc
import collections
# assembly_interfaces is referenced from specification in this module.
from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import
from grpc.framework.assembly import utilities as assembly_utilities
# face_interfaces is referenced from specification in this module.
from grpc.framework.common import cardinality
from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
from grpc.framework.face import utilities as face_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
# TODO(issue 726): Kill the "implementations" attribute of this in favor
# of the same-information-less-bogusly-represented "cardinalities".
class InvocationBreakdown(object):
"""An intermediate representation of invocation-side views of RPC methods.
Attributes:
cardinalities: A dictionary from RPC method name to interfaces.Cardinality
value.
implementations: A dictionary from RPC method name to
assembly_interfaces.MethodImplementation describing the method.
request_serializers: A dictionary from RPC method name to callable
behavior to be used serializing request values for the RPC.
response_deserializers: A dictionary from RPC method name to callable
@ -59,8 +56,7 @@ class _EasyInvocationBreakdown(
InvocationBreakdown,
collections.namedtuple(
'_EasyInvocationBreakdown',
('cardinalities', 'implementations', 'request_serializers',
'response_deserializers'))):
('cardinalities', 'request_serializers', 'response_deserializers'))):
pass
@ -68,8 +64,8 @@ class ServiceBreakdown(object):
"""An intermediate representation of service-side views of RPC methods.
Attributes:
implementations: A dictionary from RPC method name
assembly_interfaces.MethodImplementation implementing the RPC method.
implementations: A dictionary from RPC method name to
face_interfaces.MethodImplementation implementing the RPC method.
request_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing request values for the RPC.
response_serializers: A dictionary from RPC method name to callable
@ -97,25 +93,14 @@ def break_down_invocation(method_descriptions):
An InvocationBreakdown corresponding to the given method descriptions.
"""
cardinalities = {}
implementations = {}
request_serializers = {}
response_deserializers = {}
for name, method_description in method_descriptions.iteritems():
cardinality = method_description.cardinality()
cardinalities[name] = cardinality
if cardinality is interfaces.Cardinality.UNARY_UNARY:
implementations[name] = assembly_utilities.unary_unary_inline(None)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
implementations[name] = assembly_utilities.unary_stream_inline(None)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
implementations[name] = assembly_utilities.stream_unary_inline(None)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
implementations[name] = assembly_utilities.stream_stream_inline(None)
cardinalities[name] = method_description.cardinality()
request_serializers[name] = method_description.serialize_request
response_deserializers[name] = method_description.deserialize_response
return _EasyInvocationBreakdown(
cardinalities, implementations, request_serializers,
response_deserializers)
cardinalities, request_serializers, response_deserializers)
def break_down_service(method_descriptions):
@ -139,28 +124,28 @@ def break_down_service(method_descriptions):
service_behavior=method_description.service_unary_unary):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.unary_unary_inline(service)
implementations[name] = face_utilities.unary_unary_inline(service)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
def service(
request, face_rpc_context,
service_behavior=method_description.service_unary_stream):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.unary_stream_inline(service)
implementations[name] = face_utilities.unary_stream_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_unary):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.stream_unary_inline(service)
implementations[name] = face_utilities.stream_unary_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_stream):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.stream_stream_inline(service)
implementations[name] = face_utilities.stream_stream_inline(service)
request_deserializers[name] = method_description.deserialize_request
response_serializers[name] = method_description.serialize_response

@ -27,12 +27,20 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc.framework.common import cardinality
from grpc.framework.face import exceptions as face_exceptions
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.foundation import future
from grpc.early_adopter import exceptions
from grpc.early_adopter import interfaces
_EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY = {
interfaces.Cardinality.UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
interfaces.Cardinality.UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
interfaces.Cardinality.STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
interfaces.Cardinality.STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
}
_ABORTION_REEXPORT = {
face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED,
face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED,
@ -142,28 +150,28 @@ class _RpcContext(interfaces.RpcContext):
class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
def __init__(self, face_unary_unary_sync_async):
self._underlying = face_unary_unary_sync_async
def __init__(self, face_unary_unary_multi_callable):
self._underlying = face_unary_unary_multi_callable
def __call__(self, request, timeout):
return _call_reexporting_errors(
self._underlying, request, timeout)
def async(self, request, timeout):
return _ReexportedFuture(self._underlying.async(request, timeout))
return _ReexportedFuture(self._underlying.future(request, timeout))
class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
def __init__(self, face_stream_unary_sync_async):
self._underlying = face_stream_unary_sync_async
def __init__(self, face_stream_unary_multi_callable):
self._underlying = face_stream_unary_multi_callable
def __call__(self, request_iterator, timeout):
return _call_reexporting_errors(
self._underlying, request_iterator, timeout)
def async(self, request_iterator, timeout):
return _ReexportedFuture(self._underlying.async(request_iterator, timeout))
return _ReexportedFuture(self._underlying.future(request_iterator, timeout))
class _Stub(interfaces.Stub):
@ -182,31 +190,40 @@ class _Stub(interfaces.Stub):
def __getattr__(self, attr):
underlying_attr = self._assembly_stub.__getattr__(attr)
cardinality = self._cardinalities.get(attr)
method_cardinality = self._cardinalities.get(attr)
# TODO(nathaniel): unify this trick with its other occurrence in the code.
if cardinality is None:
for name, cardinality in self._cardinalities.iteritems():
if method_cardinality is None:
for name, method_cardinality in self._cardinalities.iteritems():
last_slash_index = name.rfind('/')
if 0 <= last_slash_index and name[last_slash_index + 1:] == attr:
break
else:
raise AttributeError(attr)
if cardinality is interfaces.Cardinality.UNARY_UNARY:
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
return _UnaryUnarySyncAsync(underlying_attr)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
return lambda request, timeout: _CancellableIterator(
underlying_attr(request, timeout))
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
return _StreamUnarySyncAsync(underlying_attr)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
return lambda request_iterator, timeout: _CancellableIterator(
underlying_attr(request_iterator, timeout))
else:
raise AttributeError(attr)
def common_cardinalities(early_adopter_cardinalities):
common_cardinalities = {}
for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems():
common_cardinalities[name] = _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY[
early_adopter_cardinality]
return common_cardinalities
def rpc_context(face_rpc_context):
return _RpcContext(face_rpc_context)
def stub(assembly_stub, cardinalities):
return _Stub(assembly_stub, cardinalities)
def stub(face_stub, cardinalities):
return _Stub(face_stub, cardinalities)

@ -33,7 +33,7 @@ import threading
from grpc._adapter import fore as _fore
from grpc._adapter import rear as _rear
from grpc.early_adopter import _assembly_utilities
from grpc.early_adopter import _face_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
from grpc.framework.assembly import implementations as _assembly_implementations
@ -95,12 +95,13 @@ class _Server(interfaces.Server):
def _build_stub(breakdown, activated_rear_link):
assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub(
breakdown.implementations, activated_rear_link)
_reexport.common_cardinalities(breakdown.cardinalities),
activated_rear_link)
return _reexport.stub(assembly_stub, breakdown.cardinalities)
def _build_server(methods, port, private_key, certificate_chain):
breakdown = _assembly_utilities.break_down_service(methods)
breakdown = _face_utilities.break_down_service(methods)
return _Server(breakdown, port, private_key, certificate_chain)
@ -117,7 +118,7 @@ def insecure_stub(methods, host, port):
Returns:
An interfaces.Stub affording RPC invocation.
"""
breakdown = _assembly_utilities.break_down_invocation(methods)
breakdown = _face_utilities.break_down_invocation(methods)
activated_rear_link = _rear.activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers)
@ -147,7 +148,7 @@ def secure_stub(
Returns:
An interfaces.Stub affording RPC invocation.
"""
breakdown = _assembly_utilities.break_down_invocation(methods)
breakdown = _face_utilities.break_down_invocation(methods)
activated_rear_link = _rear.secure_activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers, root_certificates, private_key,

@ -66,7 +66,7 @@ class _FaceStub(object):
self._rear_link.start()
self._rear_link.join_fore_link(self._front)
self._front.join_rear_link(self._rear_link)
self._under_stub = face_implementations.stub(self._front, self._pool)
self._under_stub = face_implementations.generic_stub(self._front, self._pool)
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
@ -86,18 +86,18 @@ class _FaceStub(object):
return getattr(self._under_stub, attr)
def _behaviors(implementations, front, pool):
def _behaviors(method_cardinalities, front, pool):
behaviors = {}
stub = face_implementations.stub(front, pool)
for name, implementation in implementations.iteritems():
if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
behaviors[name] = stub.unary_unary_sync_async(name)
elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
stub = face_implementations.generic_stub(front, pool)
for name, method_cardinality in method_cardinalities.iteritems():
if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
behaviors[name] = stub.unary_unary_multi_callable(name)
elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
behaviors[name] = lambda request, context, bound_name=name: (
stub.inline_value_in_stream_out(bound_name, request, context))
elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
behaviors[name] = stub.stream_unary_sync_async(name)
elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
behaviors[name] = stub.stream_unary_multi_callable(name)
elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
behaviors[name] = lambda request_iterator, context, bound_name=name: (
stub.inline_stream_in_stream_out(
bound_name, request_iterator, context))
@ -106,8 +106,8 @@ def _behaviors(implementations, front, pool):
class _DynamicInlineStub(object):
def __init__(self, implementations, rear_link):
self._implementations = implementations
def __init__(self, cardinalities, rear_link):
self._cardinalities = cardinalities
self._rear_link = rear_link
self._lock = threading.Lock()
self._pool = None
@ -123,7 +123,7 @@ class _DynamicInlineStub(object):
self._rear_link.join_fore_link(self._front)
self._front.join_rear_link(self._rear_link)
self._behaviors = _behaviors(
self._implementations, self._front, self._pool)
self._cardinalities, self._front, self._pool)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
@ -151,58 +151,6 @@ class _DynamicInlineStub(object):
return behavior
def _servicer(implementations, pool):
inline_value_in_value_out_methods = {}
inline_value_in_stream_out_methods = {}
inline_stream_in_value_out_methods = {}
inline_stream_in_stream_out_methods = {}
event_value_in_value_out_methods = {}
event_value_in_stream_out_methods = {}
event_stream_in_value_out_methods = {}
event_stream_in_stream_out_methods = {}
for name, implementation in implementations.iteritems():
if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
if implementation.style is style.Service.INLINE:
inline_value_in_value_out_methods[name] = (
face_utilities.inline_unary_unary_method(implementation.unary_unary_inline))
elif implementation.style is style.Service.EVENT:
event_value_in_value_out_methods[name] = (
face_utilities.event_unary_unary_method(implementation.unary_unary_event))
elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
if implementation.style is style.Service.INLINE:
inline_value_in_stream_out_methods[name] = (
face_utilities.inline_unary_stream_method(implementation.unary_stream_inline))
elif implementation.style is style.Service.EVENT:
event_value_in_stream_out_methods[name] = (
face_utilities.event_unary_stream_method(implementation.unary_stream_event))
if implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
if implementation.style is style.Service.INLINE:
inline_stream_in_value_out_methods[name] = (
face_utilities.inline_stream_unary_method(implementation.stream_unary_inline))
elif implementation.style is style.Service.EVENT:
event_stream_in_value_out_methods[name] = (
face_utilities.event_stream_unary_method(implementation.stream_unary_event))
elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
if implementation.style is style.Service.INLINE:
inline_stream_in_stream_out_methods[name] = (
face_utilities.inline_stream_stream_method(implementation.stream_stream_inline))
elif implementation.style is style.Service.EVENT:
event_stream_in_stream_out_methods[name] = (
face_utilities.event_stream_stream_method(implementation.stream_stream_event))
return face_implementations.servicer(
pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods)
class _ServiceAssembly(interfaces.Server):
def __init__(self, implementations, fore_link):
@ -215,7 +163,8 @@ class _ServiceAssembly(interfaces.Server):
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = _servicer(self._implementations, self._pool)
servicer = face_implementations.servicer(
self._pool, self._implementations, None)
self._back = tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
_ONE_DAY_IN_SECONDS)
@ -251,7 +200,7 @@ class _ServiceAssembly(interfaces.Server):
def assemble_face_stub(activated_rear_link):
"""Assembles a face_interfaces.Stub.
"""Assembles a face_interfaces.GenericStub.
The returned object is a context manager and may only be used in context to
invoke RPCs.
@ -262,12 +211,12 @@ def assemble_face_stub(activated_rear_link):
when passed to this method.
Returns:
A face_interfaces.Stub on which, in context, RPCs can be invoked.
A face_interfaces.GenericStub on which, in context, RPCs can be invoked.
"""
return _FaceStub(activated_rear_link)
def assemble_dynamic_inline_stub(implementations, activated_rear_link):
def assemble_dynamic_inline_stub(cardinalities, activated_rear_link):
"""Assembles a stub with method names for attributes.
The returned object is a context manager and may only be used in context to
@ -276,29 +225,27 @@ def assemble_dynamic_inline_stub(implementations, activated_rear_link):
The returned object, when used in context, will respond to attribute access
as follows: if the requested attribute is the name of a unary-unary RPC
method, the value of the attribute will be a
face_interfaces.UnaryUnarySyncAsync with which to invoke the RPC method. If
the requested attribute is the name of a unary-stream RPC method, the value
of the attribute will be a callable with the semantics of
face_interfaces.Stub.inline_value_in_stream_out, minus the "name" parameter,
face_interfaces.UnaryUnaryMultiCallable with which to invoke the RPC method.
If the requested attribute is the name of a unary-stream RPC method, the
value of the attribute will be a face_interfaces.UnaryStreamMultiCallable
with which to invoke the RPC method. If the requested attribute is the name
of a stream-unary RPC method, the value of the attribute will be a
face_interfaces.StreamUnarySyncAsync with which to invoke the RPC method. If
the requested attribute is the name of a stream-stream RPC method, the value
of the attribute will be a callable with the semantics of
face_interfaces.Stub.inline_stream_in_stream_out, minus the "name" parameter,
face_interfaces.StreamUnaryMultiCallable with which to invoke the RPC method.
If the requested attribute is the name of a stream-stream RPC method, the
value of the attribute will be a face_interfaces.StreamStreamMultiCallable
with which to invoke the RPC method.
Args:
implementations: A dictionary from RPC method name to
interfaces.MethodImplementation.
cardinalities: A dictionary from RPC method name to cardinality.Cardinality
value identifying the cardinality of the named RPC method.
activated_rear_link: An object that is both a tickets_interfaces.RearLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
A stub on which, in context, RPCs can be invoked.
A face_interfaces.DynamicStub on which, in context, RPCs can be invoked.
"""
return _DynamicInlineStub(implementations, activated_rear_link)
return _DynamicInlineStub(cardinalities, activated_rear_link)
def assemble_service(implementations, activated_fore_link):
@ -306,7 +253,7 @@ def assemble_service(implementations, activated_fore_link):
Args:
implementations: A dictionary from RPC method name to
interfaces.MethodImplementation.
face_interfaces.MethodImplementation.
activated_fore_link: An object that is both a tickets_interfaces.ForeLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.

@ -35,11 +35,11 @@ import threading
import unittest
from grpc.framework.assembly import implementations
from grpc.framework.assembly import utilities
from grpc.framework.base import interfaces
from grpc.framework.base.packets import packets as tickets
from grpc.framework.base.packets import interfaces as tickets_interfaces
from grpc.framework.base.packets import null
from grpc.framework.face import utilities as face_utilities
from grpc.framework.foundation import logging_pool
from grpc._junkdrawer import math_pb2
@ -81,12 +81,16 @@ def _sum(request_iterator, unused_context):
_IMPLEMENTATIONS = {
DIV: utilities.unary_unary_inline(_div),
DIV_MANY: utilities.stream_stream_inline(_div_many),
FIB: utilities.unary_stream_inline(_fib),
SUM: utilities.stream_unary_inline(_sum),
DIV: face_utilities.unary_unary_inline(_div),
DIV_MANY: face_utilities.stream_stream_inline(_div_many),
FIB: face_utilities.unary_stream_inline(_fib),
SUM: face_utilities.stream_unary_inline(_sum),
}
_CARDINALITIES = {
name: implementation.cardinality
for name, implementation in _IMPLEMENTATIONS.iteritems()}
_TIMEOUT = 10
@ -170,8 +174,8 @@ class FaceStubTest(unittest.TestCase):
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
sync_async = face_stub.stream_unary_sync_async(SUM)
response_future = sync_async.async(
multi_callable = face_stub.stream_unary_multi_callable(SUM)
response_future = multi_callable.future(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
@ -214,7 +218,7 @@ class DynamicInlineStubTest(unittest.TestCase):
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
_CARDINALITIES, pipe)
service.start()
with dynamic_stub:
@ -229,7 +233,7 @@ class DynamicInlineStubTest(unittest.TestCase):
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
_CARDINALITIES, pipe)
with service, dynamic_stub:
response_iterator = dynamic_stub.Fib(
@ -244,10 +248,10 @@ class DynamicInlineStubTest(unittest.TestCase):
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
_CARDINALITIES, pipe)
with service, dynamic_stub:
response_future = dynamic_stub.Sum.async(
response_future = dynamic_stub.Sum.future(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
@ -261,7 +265,7 @@ class DynamicInlineStubTest(unittest.TestCase):
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
_CARDINALITIES, pipe)
with service, dynamic_stub:
response_iterator = dynamic_stub.DivMany(

@ -33,63 +33,7 @@
import abc
# cardinality, style, and stream are referenced from specification in this
# module.
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.common import style # pylint: disable=unused-import
from grpc.framework.foundation import activated
from grpc.framework.foundation import stream # pylint: disable=unused-import
class MethodImplementation(object):
"""A sum type that describes an RPC method implementation.
Attributes:
cardinality: A cardinality.Cardinality value.
style: A style.Service value.
unary_unary_inline: The implementation of the RPC method as a callable
value that takes a request value and a face_interfaces.RpcContext object
and returns a response value. Only non-None if cardinality is
cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
unary_stream_inline: The implementation of the RPC method as a callable
value that takes a request value and a face_interfaces.RpcContext object
and returns an iterator of response values. Only non-None if cardinality
is cardinality.Cardinality.UNARY_STREAM and style is
style.Service.INLINE.
stream_unary_inline: The implementation of the RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns a response value. Only
non-None if cardinality is cardinality.Cardinality.STREAM_UNARY and style
is style.Service.INLINE.
stream_stream_inline: The implementation of the RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns an iterator of response
values. Only non-None if cardinality is
cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
unary_unary_event: The implementation of the RPC method as a callable value
that takes a request value, a response callback to which to pass the
response value of the RPC, and a face_interfaces.RpcContext. Only
non-None if cardinality is cardinality.Cardinality.UNARY_UNARY and style
is style.Service.EVENT.
unary_stream_event: The implementation of the RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
the response values of the RPC, and a face_interfaces.RpcContext. Only
non-None if cardinality is cardinality.Cardinality.UNARY_STREAM and style
is style.Service.EVENT.
stream_unary_event: The implementation of the RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed. Only non-None if
cardinality is cardinality.Cardinality.STREAM_UNARY and style is
style.Service.EVENT.
stream_stream_event: The implementation of the RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed. Only non-None if
cardinality is cardinality.Cardinality.STREAM_STREAM and style is
style.Service.EVENT.
"""
__metaclass__ = abc.ABCMeta
class Server(activated.Activated):

@ -1,179 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for assembling RPC framework values."""
import collections
from grpc.framework.assembly import interfaces
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.foundation import stream
class _MethodImplementation(
interfaces.MethodImplementation,
collections.namedtuple(
'_MethodImplementation',
['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
pass
def unary_unary_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable value
that takes a request value and a face_interfaces.RpcContext object and
returns a response value.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
None, None, None, None, None, None, None)
def unary_stream_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value and a face_interfaces.RpcContext object
and returns an iterator of response values.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
behavior, None, None, None, None, None, None)
def stream_unary_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns a response value.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
behavior, None, None, None, None, None)
def stream_stream_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns an iterator of response
values.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
None, behavior, None, None, None, None)
def unary_unary_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
value that takes a request value, a response callback to which to pass
the response value of the RPC, and a face_interfaces.RpcContext.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
None, None, behavior, None, None, None)
def unary_stream_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
the response values of the RPC, and a face_interfaces.RpcContext.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
None, None, None, behavior, None, None)
def stream_unary_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
None, None, None, None, behavior, None)
def stream_stream_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
None, None, None, None, None, behavior)

@ -105,15 +105,14 @@ def adapt_inline_value_in_value_out(method):
def adaptation(response_consumer, operation_context):
rpc_context = _control.RpcContext(operation_context)
return stream_util.TransformingConsumer(
lambda request: method.service(request, rpc_context), response_consumer)
lambda request: method(request, rpc_context), response_consumer)
return adaptation
def adapt_inline_value_in_stream_out(method):
def adaptation(response_consumer, operation_context):
rpc_context = _control.RpcContext(operation_context)
return _ValueInStreamOutConsumer(
method.service, rpc_context, response_consumer)
return _ValueInStreamOutConsumer(method, rpc_context, response_consumer)
return adaptation
@ -123,7 +122,7 @@ def adapt_inline_stream_in_value_out(method, pool):
operation_context.add_termination_callback(rendezvous.set_outcome)
def in_pool_thread():
response_consumer.consume_and_terminate(
method.service(rendezvous, _control.RpcContext(operation_context)))
method(rendezvous, _control.RpcContext(operation_context)))
pool.submit(_pool_wrap(in_pool_thread, operation_context))
return rendezvous
return adaptation
@ -149,7 +148,7 @@ def adapt_inline_stream_in_stream_out(method, pool):
operation_context.add_termination_callback(rendezvous.set_outcome)
def in_pool_thread():
_control.pipe_iterator_to_consumer(
method.service(rendezvous, _control.RpcContext(operation_context)),
method(rendezvous, _control.RpcContext(operation_context)),
response_consumer, operation_context.is_active, True)
pool.submit(_pool_wrap(in_pool_thread, operation_context))
return rendezvous
@ -159,7 +158,7 @@ def adapt_inline_stream_in_stream_out(method, pool):
def adapt_event_value_in_value_out(method):
def adaptation(response_consumer, operation_context):
def on_payload(payload):
method.service(
method(
payload, response_consumer.consume_and_terminate,
_control.RpcContext(operation_context))
return _control.UnaryConsumer(on_payload)
@ -169,7 +168,7 @@ def adapt_event_value_in_value_out(method):
def adapt_event_value_in_stream_out(method):
def adaptation(response_consumer, operation_context):
def on_payload(payload):
method.service(
method(
payload, response_consumer, _control.RpcContext(operation_context))
return _control.UnaryConsumer(on_payload)
return adaptation
@ -178,12 +177,11 @@ def adapt_event_value_in_stream_out(method):
def adapt_event_stream_in_value_out(method):
def adaptation(response_consumer, operation_context):
rpc_context = _control.RpcContext(operation_context)
return method.service(response_consumer.consume_and_terminate, rpc_context)
return method(response_consumer.consume_and_terminate, rpc_context)
return adaptation
def adapt_event_stream_in_stream_out(method):
def adaptation(response_consumer, operation_context):
return method.service(
response_consumer, _control.RpcContext(operation_context))
return method(response_consumer, _control.RpcContext(operation_context))
return adaptation

@ -42,37 +42,17 @@ class FaceTestCase(test_case.FaceTestCase):
"""Provides abstract Face-layer tests an in-memory implementation."""
def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
self, name, methods, method_implementations,
multi_method_implementation):
servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
servicer = implementations.servicer(
servicer_pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
servicer_pool, method_implementations, multi_method_implementation)
linked_pair = base_util.linked_pair(servicer, _TIMEOUT)
server = implementations.server()
stub = implementations.stub(linked_pair.front, stub_pool)
return server, stub, (servicer_pool, stub_pool, linked_pair)
stub = implementations.generic_stub(linked_pair.front, stub_pool)
return stub, (servicer_pool, stub_pool, linked_pair)
def tear_down_implementation(self, memo):
servicer_pool, stub_pool, linked_pair = memo

@ -29,6 +29,8 @@
"""Entry points into the Face layer of RPC Framework."""
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.base import exceptions as _base_exceptions
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.face import _calls
@ -56,7 +58,7 @@ class _BaseServicer(base_interfaces.Servicer):
raise _base_exceptions.NoSuchMethodError()
class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
class _UnaryUnaryMultiCallable(interfaces.UnaryUnaryMultiCallable):
def __init__(self, front, name):
self._front = front
@ -66,12 +68,33 @@ class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
return _calls.blocking_value_in_value_out(
self._front, self._name, request, timeout, 'unused trace ID')
def async(self, request, timeout):
def future(self, request, timeout):
return _calls.future_value_in_value_out(
self._front, self._name, request, timeout, 'unused trace ID')
def event(self, request, response_callback, abortion_callback, timeout):
return _calls.event_value_in_value_out(
self._front, self._name, request, response_callback, abortion_callback,
timeout, 'unused trace ID')
class _UnaryStreamMultiCallable(interfaces.UnaryStreamMultiCallable):
def __init__(self, front, name):
self._front = front
self._name = name
class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
def __call__(self, request, timeout):
return _calls.inline_value_in_stream_out(
self._front, self._name, request, timeout, 'unused trace ID')
def event(self, request, response_consumer, abortion_callback, timeout):
return _calls.event_value_in_stream_out(
self._front, self._name, request, response_consumer, abortion_callback,
timeout, 'unused trace ID')
class _StreamUnaryMultiCallable(interfaces.StreamUnaryMultiCallable):
def __init__(self, front, name, pool):
self._front = front
@ -82,18 +105,37 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
return _calls.blocking_stream_in_value_out(
self._front, self._name, request_iterator, timeout, 'unused trace ID')
def async(self, request_iterator, timeout):
def future(self, request_iterator, timeout):
return _calls.future_stream_in_value_out(
self._front, self._name, request_iterator, timeout, 'unused trace ID',
self._pool)
def event(self, response_callback, abortion_callback, timeout):
return _calls.event_stream_in_value_out(
self._front, self._name, response_callback, abortion_callback, timeout,
'unused trace ID')
class _Server(interfaces.Server):
"""An interfaces.Server implementation."""
class _StreamStreamMultiCallable(interfaces.StreamStreamMultiCallable):
class _Stub(interfaces.Stub):
"""An interfaces.Stub implementation."""
def __init__(self, front, name, pool):
self._front = front
self._name = name
self._pool = pool
def __call__(self, request_iterator, timeout):
return _calls.inline_stream_in_stream_out(
self._front, self._name, request_iterator, timeout, 'unused trace ID',
self._pool)
def event(self, response_consumer, abortion_callback, timeout):
return _calls.event_stream_in_stream_out(
self._front, self._name, response_consumer, abortion_callback, timeout,
'unused trace ID')
class _GenericStub(interfaces.GenericStub):
"""An interfaces.GenericStub implementation."""
def __init__(self, front, pool):
self._front = front
@ -149,136 +191,128 @@ class _Stub(interfaces.Stub):
self._front, name, response_consumer, abortion_callback, timeout,
'unused trace ID')
def unary_unary_sync_async(self, name):
return _UnaryUnarySyncAsync(self._front, name)
def stream_unary_sync_async(self, name):
return _StreamUnarySyncAsync(self._front, name, self._pool)
def _aggregate_methods(
pool,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods):
"""Aggregates methods coded in according to different interfaces."""
methods = {}
def adapt_unpooled_methods(adapted_methods, unadapted_methods, adaptation):
if unadapted_methods is not None:
for name, unadapted_method in unadapted_methods.iteritems():
adapted_methods[name] = adaptation(unadapted_method)
def adapt_pooled_methods(adapted_methods, unadapted_methods, adaptation):
if unadapted_methods is not None:
for name, unadapted_method in unadapted_methods.iteritems():
adapted_methods[name] = adaptation(unadapted_method, pool)
adapt_unpooled_methods(
methods, inline_value_in_value_out_methods,
_service.adapt_inline_value_in_value_out)
adapt_unpooled_methods(
methods, inline_value_in_stream_out_methods,
_service.adapt_inline_value_in_stream_out)
adapt_pooled_methods(
methods, inline_stream_in_value_out_methods,
_service.adapt_inline_stream_in_value_out)
adapt_pooled_methods(
methods, inline_stream_in_stream_out_methods,
_service.adapt_inline_stream_in_stream_out)
adapt_unpooled_methods(
methods, event_value_in_value_out_methods,
_service.adapt_event_value_in_value_out)
adapt_unpooled_methods(
methods, event_value_in_stream_out_methods,
_service.adapt_event_value_in_stream_out)
adapt_unpooled_methods(
methods, event_stream_in_value_out_methods,
_service.adapt_event_stream_in_value_out)
adapt_unpooled_methods(
methods, event_stream_in_stream_out_methods,
_service.adapt_event_stream_in_stream_out)
return methods
def servicer(
pool,
inline_value_in_value_out_methods=None,
inline_value_in_stream_out_methods=None,
inline_stream_in_value_out_methods=None,
inline_stream_in_stream_out_methods=None,
event_value_in_value_out_methods=None,
event_value_in_stream_out_methods=None,
event_stream_in_value_out_methods=None,
event_stream_in_stream_out_methods=None,
multi_method=None):
def unary_unary_multi_callable(self, name):
return _UnaryUnaryMultiCallable(self._front, name)
def unary_stream_multi_callable(self, name):
return _UnaryStreamMultiCallable(self._front, name)
def stream_unary_multi_callable(self, name):
return _StreamUnaryMultiCallable(self._front, name, self._pool)
def stream_stream_multi_callable(self, name):
return _StreamStreamMultiCallable(self._front, name, self._pool)
class _DynamicStub(interfaces.DynamicStub):
"""An interfaces.DynamicStub implementation."""
def __init__(self, cardinalities, front, pool):
self._cardinalities = cardinalities
self._front = front
self._pool = pool
def __getattr__(self, attr):
cardinality = self._cardinalities.get(attr)
if cardinality is cardinality.Cardinality.UNARY_UNARY:
return _UnaryUnaryMultiCallable(self._front, attr)
elif cardinality is cardinality.Cardinality.UNARY_STREAM:
return _UnaryStreamMultiCallable(self._front, attr)
elif cardinality is cardinality.Cardinality.STREAM_UNARY:
return _StreamUnaryMultiCallable(self._front, attr, self._pool)
elif cardinality is cardinality.Cardinality.STREAM_STREAM:
return _StreamStreamMultiCallable(self._front, attr, self._pool)
else:
raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
def _adapt_method_implementations(method_implementations, pool):
adapted_implementations = {}
for name, method_implementation in method_implementations.iteritems():
if method_implementation.style is style.Service.INLINE:
if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
adapted_implementations[name] = _service.adapt_inline_value_in_value_out(
method_implementation.unary_unary_inline)
elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
adapted_implementations[name] = _service.adapt_inline_value_in_stream_out(
method_implementation.unary_stream_inline)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
adapted_implementations[name] = _service.adapt_inline_stream_in_value_out(
method_implementation.stream_unary_inline, pool)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
adapted_implementations[name] = _service.adapt_inline_stream_in_stream_out(
method_implementation.stream_stream_inline, pool)
elif method_implementation.style is style.Service.EVENT:
if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
adapted_implementations[name] = _service.adapt_event_value_in_value_out(
method_implementation.unary_unary_event)
elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
adapted_implementations[name] = _service.adapt_event_value_in_stream_out(
method_implementation.unary_stream_event)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
adapted_implementations[name] = _service.adapt_event_stream_in_value_out(
method_implementation.stream_unary_event)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
adapted_implementations[name] = _service.adapt_event_stream_in_stream_out(
method_implementation.stream_stream_event)
return adapted_implementations
def servicer(pool, method_implementations, multi_method_implementation):
"""Creates a base_interfaces.Servicer.
The key sets of the passed dictionaries must be disjoint. It is guaranteed
that any passed MultiMethod implementation will only be called to service an
RPC if the RPC method name is not present in the key sets of the passed
dictionaries.
It is guaranteed that any passed interfaces.MultiMethodImplementation will
only be called to service an RPC if there is no
interfaces.MethodImplementation for the RPC method in the passed
method_implementations dictionary.
Args:
pool: A thread pool.
inline_value_in_value_out_methods: A dictionary mapping method names to
interfaces.InlineValueInValueOutMethod implementations.
inline_value_in_stream_out_methods: A dictionary mapping method names to
interfaces.InlineValueInStreamOutMethod implementations.
inline_stream_in_value_out_methods: A dictionary mapping method names to
interfaces.InlineStreamInValueOutMethod implementations.
inline_stream_in_stream_out_methods: A dictionary mapping method names to
interfaces.InlineStreamInStreamOutMethod implementations.
event_value_in_value_out_methods: A dictionary mapping method names to
interfaces.EventValueInValueOutMethod implementations.
event_value_in_stream_out_methods: A dictionary mapping method names to
interfaces.EventValueInStreamOutMethod implementations.
event_stream_in_value_out_methods: A dictionary mapping method names to
interfaces.EventStreamInValueOutMethod implementations.
event_stream_in_stream_out_methods: A dictionary mapping method names to
interfaces.EventStreamInStreamOutMethod implementations.
multi_method: An implementation of interfaces.MultiMethod.
method_implementations: A dictionary from RPC method name to
interfaces.MethodImplementation object to be used to service the named
RPC method.
multi_method_implementation: An interfaces.MultiMethodImplementation to be
used to service any RPCs not serviced by the
interfaces.MethodImplementations given in the method_implementations
dictionary, or None.
Returns:
A base_interfaces.Servicer that services RPCs via the given implementations.
"""
methods = _aggregate_methods(
pool,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods)
adapted_implementations = _adapt_method_implementations(
method_implementations, pool)
return _BaseServicer(adapted_implementations, multi_method_implementation)
return _BaseServicer(methods, multi_method)
def generic_stub(front, pool):
"""Creates an interfaces.GenericStub.
def server():
"""Creates an interfaces.Server.
Args:
front: A base_interfaces.Front.
pool: A futures.ThreadPoolExecutor.
Returns:
An interfaces.Server.
An interfaces.GenericStub that performs RPCs via the given
base_interfaces.Front.
"""
return _Server()
return _GenericStub(front, pool)
def stub(front, pool):
"""Creates an interfaces.Stub.
def dynamic_stub(cardinalities, front, pool, prefix):
"""Creates an interfaces.DynamicStub.
Args:
cardinalities: A dict from RPC method name to cardinality.Cardinality
value identifying the cardinality of every RPC method to be supported by
the created interfaces.DynamicStub.
front: A base_interfaces.Front.
pool: A futures.ThreadPoolExecutor.
prefix: A string to prepend when mapping requested attribute name to RPC
method name during attribute access on the created
interfaces.DynamicStub.
Returns:
An interfaces.Stub that performs RPCs via the given base_interfaces.Front.
An interfaces.DynamicStub that performs RPCs via the given
base_interfaces.Front.
"""
return _Stub(front, pool)
return _DynamicStub(cardinalities, front, pool, prefix)

@ -32,11 +32,24 @@
import abc
import enum
# exceptions, abandonment, and future are referenced from specification in this
# module.
# cardinality, style, exceptions, abandonment, future, and stream are
# referenced from specification in this module.
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.common import style # pylint: disable=unused-import
from grpc.framework.face import exceptions # pylint: disable=unused-import
from grpc.framework.foundation import abandonment # pylint: disable=unused-import
from grpc.framework.foundation import future # pylint: disable=unused-import
from grpc.framework.foundation import stream # pylint: disable=unused-import
@enum.unique
class Abortion(enum.Enum):
"""Categories of RPC abortion."""
CANCELLED = 'cancelled'
EXPIRED = 'expired'
NETWORK_FAILURE = 'network failure'
SERVICED_FAILURE = 'serviced failure'
SERVICER_FAILURE = 'servicer failure'
class CancellableIterator(object):
@ -59,69 +72,61 @@ class CancellableIterator(object):
raise NotImplementedError()
class UnaryUnarySyncAsync(object):
"""Affords invoking a unary-unary RPC synchronously or asynchronously.
Values implementing this interface are directly callable and present an
"async" method. Both calls take a request value and a numeric timeout.
Direct invocation of a value of this type invokes its associated RPC and
blocks until the RPC's response is available. Calling the "async" method
of a value of this type invokes its associated RPC and immediately returns a
future.Future bound to the asynchronous execution of the RPC.
"""
class RpcContext(object):
"""Provides RPC-related information and action."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request, timeout):
"""Synchronously invokes the underlying RPC.
def is_active(self):
"""Describes whether the RPC is active or has terminated."""
raise NotImplementedError()
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
A nonnegative float indicating the length of allowed time in seconds
remaining for the RPC to complete before it is considered to have timed
out.
"""
raise NotImplementedError()
@abc.abstractmethod
def async(self, request, timeout):
"""Asynchronously invokes the underlying RPC.
def add_abortion_callback(self, abortion_callback):
"""Registers a callback to be called if the RPC is aborted.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future's result value will be the response value of the RPC.
In the event of RPC abortion, the returned Future's exception value
will be an exceptions.RpcError.
abortion_callback: A callable to be called and passed an Abortion value
in the event of RPC abortion.
"""
raise NotImplementedError()
class StreamUnarySyncAsync(object):
"""Affords invoking a stream-unary RPC synchronously or asynchronously.
class Call(object):
"""Invocation-side representation of an RPC.
Values implementing this interface are directly callable and present an
"async" method. Both calls take an iterator of request values and a numeric
timeout. Direct invocation of a value of this type invokes its associated RPC
and blocks until the RPC's response is available. Calling the "async" method
of a value of this type invokes its associated RPC and immediately returns a
future.Future bound to the asynchronous execution of the RPC.
Attributes:
context: An RpcContext affording information about the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request_iterator, timeout):
def cancel(self):
"""Requests cancellation of the RPC."""
raise NotImplementedError()
class UnaryUnaryMultiCallable(object):
"""Affords invoking a unary-unary RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@ -133,11 +138,11 @@ class StreamUnarySyncAsync(object):
raise NotImplementedError()
@abc.abstractmethod
def async(self, request, timeout):
def future(self, request, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@ -148,248 +153,204 @@ class StreamUnarySyncAsync(object):
"""
raise NotImplementedError()
@enum.unique
class Abortion(enum.Enum):
"""Categories of RPC abortion."""
CANCELLED = 'cancelled'
EXPIRED = 'expired'
NETWORK_FAILURE = 'network failure'
SERVICED_FAILURE = 'serviced failure'
SERVICER_FAILURE = 'servicer failure'
class RpcContext(object):
"""Provides RPC-related information and action."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def is_active(self):
"""Describes whether the RPC is active or has terminated."""
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
Returns:
A nonnegative float indicating the length of allowed time in seconds
remaining for the RPC to complete before it is considered to have timed
out.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_abortion_callback(self, abortion_callback):
"""Registers a callback to be called if the RPC is aborted.
def event(self, request, response_callback, abortion_callback, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
abortion_callback: A callable to be called and passed an Abortion value
request: The request value for the RPC.
response_callback: A callback to be called to accept the restponse value
of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A Call object for the RPC.
"""
raise NotImplementedError()
class InlineValueInValueOutMethod(object):
"""A type for inline unary-request-unary-response RPC methods."""
class UnaryStreamMultiCallable(object):
"""Affords invoking a unary-stream RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, context):
"""Services an RPC that accepts one value and produces one value.
def __call__(self, request, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request: The single request value for the RPC.
context: An RpcContext object.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The single response value for the RPC.
Raises:
abandonment.Abandoned: If no response is necessary because the RPC has
been aborted.
A CancellableIterator that yields the response values of the RPC and
affords RPC cancellation. Drawing response values from the returned
CancellableIterator may raise exceptions.RpcError indicating abortion
of the RPC.
"""
raise NotImplementedError()
class InlineValueInStreamOutMethod(object):
"""A type for inline unary-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, context):
"""Services an RPC that accepts one value and produces a stream of values.
def event(self, request, response_consumer, abortion_callback, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request: The single request value for the RPC.
context: An RpcContext object.
Yields:
The values that comprise the response stream of the RPC.
request: The request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the restponse
values of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Raises:
abandonment.Abandoned: If completing the response stream is not necessary
because the RPC has been aborted.
Returns:
A Call object for the RPC.
"""
raise NotImplementedError()
class InlineStreamInValueOutMethod(object):
"""A type for inline stream-request-unary-response RPC methods."""
class StreamUnaryMultiCallable(object):
"""Affords invoking a stream-unary RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request_iterator, context):
"""Services an RPC that accepts a stream of values and produces one value.
def __call__(self, request_iterator, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields the request values of the RPC.
Drawing values from this iterator may also raise exceptions.RpcError to
indicate abortion of the RPC.
context: An RpcContext object.
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Yields:
The values that comprise the response stream of the RPC.
Returns:
The response value for the RPC.
Raises:
abandonment.Abandoned: If no response is necessary because the RPC has
been aborted.
exceptions.RpcError: Implementations of this method must not deliberately
raise exceptions.RpcError but may allow such errors raised by the
request_iterator passed to them to propagate through their bodies
uncaught.
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
class InlineStreamInStreamOutMethod(object):
"""A type for inline stream-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request_iterator, context):
"""Services an RPC that accepts and produces streams of values.
def future(self, request_iterator, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields the request values of the RPC.
Drawing values from this iterator may also raise exceptions.RpcError to
indicate abortion of the RPC.
context: An RpcContext object.
Yields:
The values that comprise the response stream of the RPC.
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Raises:
abandonment.Abandoned: If completing the response stream is not necessary
because the RPC has been aborted.
exceptions.RpcError: Implementations of this method must not deliberately
raise exceptions.RpcError but may allow such errors raised by the
request_iterator passed to them to propagate through their bodies
uncaught.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future's result value will be the response value of the RPC.
In the event of RPC abortion, the returned Future's exception value
will be an exceptions.RpcError.
"""
raise NotImplementedError()
class EventValueInValueOutMethod(object):
"""A type for event-driven unary-request-unary-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, response_callback, context):
"""Services an RPC that accepts one value and produces one value.
def event(self, response_callback, abortion_callback, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request: The single request value for the RPC.
response_callback: A callback to be called to accept the response value of
the RPC.
context: An RpcContext object.
request: The request value for the RPC.
response_callback: A callback to be called to accept the restponse value
of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
request values of the RPC should be passed.
"""
raise NotImplementedError()
class EventValueInStreamOutMethod(object):
"""A type for event-driven unary-request-stream-response RPC methods."""
class StreamStreamMultiCallable(object):
"""Affords invoking a stream-stream RPC in any call style."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, response_consumer, context):
"""Services an RPC that accepts one value and produces a stream of values.
def __call__(self, request_iterator, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request: The single request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
context: An RpcContext object.
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
Returns:
A CancellableIterator that yields the response values of the RPC and
affords RPC cancellation. Drawing response values from the returned
CancellableIterator may raise exceptions.RpcError indicating abortion
of the RPC.
"""
raise NotImplementedError()
class EventStreamInValueOutMethod(object):
"""A type for event-driven stream-request-unary-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, response_callback, context):
"""Services an RPC that accepts a stream of values and produces one value.
def event(self, response_consumer, abortion_callback, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
response_callback: A callback to be called to accept the response value of
the RPC.
context: An RpcContext object.
l Args:
response_consumer: A stream.Consumer to be called to accept the restponse
values of the RPC.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A stream.Consumer with which to accept the request values of the RPC. The
consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing values to this object. Implementations must not assume that this
object will be called to completion of the request stream or even called
at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
A pair of a Call object for the RPC and a stream.Consumer to which the
request values of the RPC should be passed.
"""
raise NotImplementedError()
class EventStreamInStreamOutMethod(object):
"""A type for event-driven stream-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, response_consumer, context):
"""Services an RPC that accepts and produces streams of values.
Args:
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
context: An RpcContext object.
Returns:
A stream.Consumer with which to accept the request values of the RPC. The
consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing values to this object. Implementations must not assume that this
object will be called to completion of the request stream or even called
at all.
class MethodImplementation(object):
"""A sum type that describes an RPC method implementation.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
Attributes:
cardinality: A cardinality.Cardinality value.
style: A style.Service value.
unary_unary_inline: The implementation of the RPC method as a callable
value that takes a request value and an RpcContext object and returns a
response value. Only non-None if cardinality is
cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
unary_stream_inline: The implementation of the RPC method as a callable
value that takes a request value and an RpcContext object and returns an
iterator of response values. Only non-None if cardinality is
cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE.
stream_unary_inline: The implementation of the RPC method as a callable
value that takes an iterator of request values and an RpcContext object
and returns a response value. Only non-None if cardinality is
cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE.
stream_stream_inline: The implementation of the RPC method as a callable
value that takes an iterator of request values and an RpcContext object
and returns an iterator of response values. Only non-None if cardinality
is cardinality.Cardinality.STREAM_STREAM and style is
style.Service.INLINE.
unary_unary_event: The implementation of the RPC method as a callable value
that takes a request value, a response callback to which to pass the
response value of the RPC, and an RpcContext. Only non-None if
cardinality is cardinality.Cardinality.UNARY_UNARY and style is
style.Service.EVENT.
unary_stream_event: The implementation of the RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
the response values of the RPC, and an RpcContext. Only non-None if
cardinality is cardinality.Cardinality.UNARY_STREAM and style is
style.Service.EVENT.
stream_unary_event: The implementation of the RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and an RpcContext and returns a stream.Consumer to which the
request values of the RPC should be passed. Only non-None if cardinality
is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT.
stream_stream_event: The implementation of the RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and an RpcContext and returns a stream.Consumer to which the
request values of the RPC should be passed. Only non-None if cardinality
is cardinality.Cardinality.STREAM_STREAM and style is
style.Service.EVENT.
"""
raise NotImplementedError()
__metaclass__ = abc.ABCMeta
class MultiMethod(object):
class MultiMethodImplementation(object):
"""A general type able to service many RPC methods."""
__metaclass__ = abc.ABCMeta
@ -420,26 +381,7 @@ class MultiMethod(object):
raise NotImplementedError()
class Server(object):
"""Specification of a running server that services RPCs."""
__metaclass__ = abc.ABCMeta
class Call(object):
"""Invocation-side representation of an RPC.
Attributes:
context: An RpcContext affording information about the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cancel(self):
"""Requests cancellation of the RPC."""
raise NotImplementedError()
class Stub(object):
class GenericStub(object):
"""Affords RPC methods to callers."""
__metaclass__ = abc.ABCMeta
@ -632,25 +574,67 @@ class Stub(object):
raise NotImplementedError()
@abc.abstractmethod
def unary_unary_sync_async(self, name):
"""Creates a UnaryUnarySyncAsync value for a unary-unary RPC method.
def unary_unary_multi_callable(self, name):
"""Creates a UnaryUnaryMultiCallable for a unary-unary RPC method.
Args:
name: The RPC method name.
Returns:
A UnaryUnarySyncAsync value for the named unary-unary RPC method.
A UnaryUnaryMultiCallable value for the named unary-unary RPC method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_unary_sync_async(self, name):
"""Creates a StreamUnarySyncAsync value for a stream-unary RPC method.
def unary_stream_multi_callable(self, name):
"""Creates a UnaryStreamMultiCallable for a unary-stream RPC method.
Args:
name: The RPC method name.
Returns:
A StreamUnarySyncAsync value for the named stream-unary RPC method.
A UnaryStreamMultiCallable value for the name unary-stream RPC method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_unary_multi_callable(self, name):
"""Creates a StreamUnaryMultiCallable for a stream-unary RPC method.
Args:
name: The RPC method name.
Returns:
A StreamUnaryMultiCallable value for the named stream-unary RPC method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_stream_multi_callable(self, name):
"""Creates a StreamStreamMultiCallable for a stream-stream RPC method.
Args:
name: The RPC method name.
Returns:
A StreamStreamMultiCallable value for the named stream-stream RPC method.
"""
raise NotImplementedError()
class DynamicStub(object):
"""A stub with RPC-method-bound multi-callable attributes.
Instances of this type responsd to attribute access as follows: if the
requested attribute is the name of a unary-unary RPC method, the value of the
attribute will be a UnaryUnaryMultiCallable with which to invoke the RPC
method; if the requested attribute is the name of a unary-stream RPC method,
the value of the attribute will be a UnaryStreamMultiCallable with which to
invoke the RPC method; if the requested attribute is the name of a
stream-unary RPC method, the value of the attribute will be a
StreamUnaryMultiCallable with which to invoke the RPC method; and if the
requested attribute is the name of a stream-stream RPC method, the value of
the attribute will be a StreamStreamMultiCallable with which to invoke the
RPC method.
"""
__metaclass__ = abc.ABCMeta

@ -61,13 +61,9 @@ class BlockingInvocationInlineServiceTestCase(
self.digest = digest.digest(
stock_service.STOCK_TEST_SERVICE, self.control, None)
self.server, self.stub, self.memo = self.set_up_implementation(
self.stub, self.memo = self.set_up_implementation(
self.digest.name, self.digest.methods,
self.digest.inline_unary_unary_methods,
self.digest.inline_unary_stream_methods,
self.digest.inline_stream_unary_methods,
self.digest.inline_stream_stream_methods,
{}, {}, {}, {}, None)
self.digest.inline_method_implementations, None)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
@ -147,8 +143,8 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
sync_async = self.stub.unary_unary_sync_async(name)
sync_async(request, _TIMEOUT)
multi_callable = self.stub.unary_unary_multi_callable(name)
multi_callable(request, _TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -170,8 +166,8 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
sync_async = self.stub.stream_unary_sync_async(name)
sync_async(iter(requests), _TIMEOUT)
multi_callable = self.stub.stream_unary_multi_callable(name)
multi_callable(iter(requests), _TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (

@ -34,6 +34,8 @@ import threading
# testing_control, interfaces, and testing_service are referenced from
# specification in this module.
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.face import exceptions
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.face.testing import control as testing_control # pylint: disable=unused-import
@ -50,15 +52,9 @@ class TestServiceDigest(
'TestServiceDigest',
['name',
'methods',
'inline_unary_unary_methods',
'inline_unary_stream_methods',
'inline_stream_unary_methods',
'inline_stream_stream_methods',
'event_unary_unary_methods',
'event_unary_stream_methods',
'event_stream_unary_methods',
'event_stream_stream_methods',
'multi_method',
'inline_method_implementations',
'event_method_implementations',
'multi_method_implementation',
'unary_unary_messages_sequences',
'unary_stream_messages_sequences',
'stream_unary_messages_sequences',
@ -69,32 +65,14 @@ class TestServiceDigest(
name: The RPC service name to be used in the test.
methods: A sequence of interfaces.Method objects describing the RPC
methods that will be called during the test.
inline_unary_unary_methods: A dict from method name to
face_interfaces.InlineValueInValueOutMethod object to be used in tests of
inline_method_implementations: A dict from RPC method name to
face_interfaces.MethodImplementation object to be used in tests of
in-line calls to behaviors under test.
inline_unary_stream_methods: A dict from method name to
face_interfaces.InlineValueInStreamOutMethod object to be used in tests of
in-line calls to behaviors under test.
inline_stream_unary_methods: A dict from method name to
face_interfaces.InlineStreamInValueOutMethod object to be used in tests of
in-line calls to behaviors under test.
inline_stream_stream_methods: A dict from method name to
face_interfaces.InlineStreamInStreamOutMethod object to be used in tests
of in-line calls to behaviors under test.
event_unary_unary_methods: A dict from method name to
face_interfaces.EventValueInValueOutMethod object to be used in tests of
event-driven calls to behaviors under test.
event_unary_stream_methods: A dict from method name to
face_interfaces.EventValueInStreamOutMethod object to be used in tests of
event-driven calls to behaviors under test.
event_stream_unary_methods: A dict from method name to
face_interfaces.EventStreamInValueOutMethod object to be used in tests of
event_method_implementations: A dict from RPC method name to
face_interfaces.MethodImplementation object to be used in tests of
event-driven calls to behaviors under test.
event_stream_stream_methods: A dict from method name to
face_interfaces.EventStreamInStreamOutMethod object to be used in tests of
event-driven calls to behaviors under test.
multi_method: A face_interfaces.MultiMethod to be used in tests of generic
calls to behaviors under test.
multi_method_implementation: A face_interfaces.MultiMethodImplementation to
be used in tests of generic calls to behaviors under test.
unary_unary_messages_sequences: A dict from method name to sequence of
service.UnaryUnaryTestMessages objects to be used to test the method
with the given name.
@ -130,27 +108,33 @@ class _BufferingConsumer(stream.Consumer):
self.terminated = True
class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
class _InlineUnaryUnaryMethod(face_interfaces.MethodImplementation):
def __init__(self, unary_unary_test_method, control):
self._test_method = unary_unary_test_method
self._control = control
def service(self, request, context):
self.cardinality = cardinality.Cardinality.UNARY_UNARY
self.style = style.Service.INLINE
def unary_unary_inline(self, request, context):
response_list = []
self._test_method.service(
request, response_list.append, context, self._control)
return response_list.pop(0)
class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod):
class _EventUnaryUnaryMethod(face_interfaces.MethodImplementation):
def __init__(self, unary_unary_test_method, control, pool):
self._test_method = unary_unary_test_method
self._control = control
self._pool = pool
def service(self, request, response_callback, context):
self.cardinality = cardinality.Cardinality.UNARY_UNARY
self.style = style.Service.EVENT
def unary_unary_event(self, request, response_callback, context):
if self._pool is None:
self._test_method.service(
request, response_callback, context, self._control)
@ -160,13 +144,16 @@ class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod):
self._control)
class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
class _InlineUnaryStreamMethod(face_interfaces.MethodImplementation):
def __init__(self, unary_stream_test_method, control):
self._test_method = unary_stream_test_method
self._control = control
def service(self, request, context):
self.cardinality = cardinality.Cardinality.UNARY_STREAM
self.style = style.Service.INLINE
def unary_stream_inline(self, request, context):
response_consumer = _BufferingConsumer()
self._test_method.service(
request, response_consumer, context, self._control)
@ -174,14 +161,17 @@ class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
yield response
class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod):
class _EventUnaryStreamMethod(face_interfaces.MethodImplementation):
def __init__(self, unary_stream_test_method, control, pool):
self._test_method = unary_stream_test_method
self._control = control
self._pool = pool
def service(self, request, response_consumer, context):
self.cardinality = cardinality.Cardinality.UNARY_STREAM
self.style = style.Service.EVENT
def unary_stream_event(self, request, response_consumer, context):
if self._pool is None:
self._test_method.service(
request, response_consumer, context, self._control)
@ -191,13 +181,16 @@ class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod):
self._control)
class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
class _InlineStreamUnaryMethod(face_interfaces.MethodImplementation):
def __init__(self, stream_unary_test_method, control):
self._test_method = stream_unary_test_method
self._control = control
def service(self, request_iterator, context):
self.cardinality = cardinality.Cardinality.STREAM_UNARY
self.style = style.Service.INLINE
def stream_unary_inline(self, request_iterator, context):
response_list = []
request_consumer = self._test_method.service(
response_list.append, context, self._control)
@ -207,14 +200,17 @@ class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
return response_list.pop(0)
class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod):
class _EventStreamUnaryMethod(face_interfaces.MethodImplementation):
def __init__(self, stream_unary_test_method, control, pool):
self._test_method = stream_unary_test_method
self._control = control
self._pool = pool
def service(self, response_callback, context):
self.cardinality = cardinality.Cardinality.STREAM_UNARY
self.style = style.Service.EVENT
def stream_unary_event(self, response_callback, context):
request_consumer = self._test_method.service(
response_callback, context, self._control)
if self._pool is None:
@ -223,13 +219,16 @@ class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod):
return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
class _InlineStreamStreamMethod(face_interfaces.MethodImplementation):
def __init__(self, stream_stream_test_method, control):
self._test_method = stream_stream_test_method
self._control = control
def service(self, request_iterator, context):
self.cardinality = cardinality.Cardinality.STREAM_STREAM
self.style = style.Service.INLINE
def stream_stream_inline(self, request_iterator, context):
response_consumer = _BufferingConsumer()
request_consumer = self._test_method.service(
response_consumer, context, self._control)
@ -241,14 +240,17 @@ class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
response_consumer.terminate()
class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod):
class _EventStreamStreamMethod(face_interfaces.MethodImplementation):
def __init__(self, stream_stream_test_method, control, pool):
self._test_method = stream_stream_test_method
self._control = control
self._pool = pool
def service(self, response_consumer, context):
self.cardinality = cardinality.Cardinality.STREAM_STREAM
self.style = style.Service.EVENT
def stream_stream_event(self, response_consumer, context):
request_consumer = self._test_method.service(
response_consumer, context, self._control)
if self._pool is None:
@ -332,7 +334,7 @@ class _StreamUnaryAdaptation(object):
response_consumer.consume_and_terminate, context, control)
class _MultiMethod(face_interfaces.MultiMethod):
class _MultiMethodImplementation(face_interfaces.MultiMethodImplementation):
def __init__(self, methods, control, pool):
self._methods = methods
@ -427,19 +429,21 @@ def digest(service, control, pool):
adaptations.update(unary_stream.adaptations)
adaptations.update(stream_unary.adaptations)
adaptations.update(stream_stream.adaptations)
inlines = dict(unary_unary.inlines)
inlines.update(unary_stream.inlines)
inlines.update(stream_unary.inlines)
inlines.update(stream_stream.inlines)
events = dict(unary_unary.events)
events.update(unary_stream.events)
events.update(stream_unary.events)
events.update(stream_stream.events)
return TestServiceDigest(
service.name(),
methods,
unary_unary.inlines,
unary_stream.inlines,
stream_unary.inlines,
stream_stream.inlines,
unary_unary.events,
unary_stream.events,
stream_unary.events,
stream_stream.events,
_MultiMethod(adaptations, control, pool),
inlines,
events,
_MultiMethodImplementation(adaptations, control, pool),
unary_unary.messages,
unary_stream.messages,
stream_unary.messages,

@ -60,14 +60,9 @@ class EventInvocationSynchronousEventServiceTestCase(
self.digest = digest.digest(
stock_service.STOCK_TEST_SERVICE, self.control, None)
self.server, self.stub, self.memo = self.set_up_implementation(
self.stub, self.memo = self.set_up_implementation(
self.digest.name, self.digest.methods,
{}, {}, {}, {},
self.digest.event_unary_unary_methods,
self.digest.event_unary_stream_methods,
self.digest.event_stream_unary_methods,
self.digest.event_stream_stream_methods,
None)
self.digest.event_method_implementations, None)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.

@ -91,14 +91,9 @@ class FutureInvocationAsynchronousEventServiceTestCase(
self.digest = digest.digest(
stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool)
self.server, self.stub, self.memo = self.set_up_implementation(
self.stub, self.memo = self.set_up_implementation(
self.digest.name, self.digest.methods,
{}, {}, {}, {},
self.digest.event_unary_unary_methods,
self.digest.event_unary_stream_methods,
self.digest.event_stream_unary_methods,
self.digest.event_stream_stream_methods,
None)
self.digest.event_method_implementations, None)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
@ -190,8 +185,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
with self.control.pause():
sync_async = self.stub.unary_unary_sync_async(name)
response_future = sync_async.async(request, _TIMEOUT)
multi_callable = self.stub.unary_unary_multi_callable(name)
response_future = multi_callable.future(request, _TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@ -216,8 +211,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
requests = test_messages.requests()
with self.control.pause():
sync_async = self.stub.stream_unary_sync_async(name)
response_future = sync_async.async(iter(requests), _TIMEOUT)
multi_callable = self.stub.stream_unary_multi_callable(name)
response_future = multi_callable.future(iter(requests), _TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):

@ -36,8 +36,8 @@ from grpc.framework.face import interfaces as face_interfaces # pylint: disable
from grpc.framework.face.testing import interfaces
class UnaryUnaryTestMethod(interfaces.Method):
"""Like face_interfaces.EventValueInValueOutMethod but with a control."""
class UnaryUnaryTestMethodImplementation(interfaces.Method):
"""A controllable implementation of a unary-unary RPC method."""
__metaclass__ = abc.ABCMeta
@ -93,8 +93,8 @@ class UnaryUnaryTestMessages(object):
raise NotImplementedError()
class UnaryStreamTestMethod(interfaces.Method):
"""Like face_interfaces.EventValueInStreamOutMethod but with a control."""
class UnaryStreamTestMethodImplementation(interfaces.Method):
"""A controllable implementation of a unary-stream RPC method."""
__metaclass__ = abc.ABCMeta
@ -106,7 +106,7 @@ class UnaryStreamTestMethod(interfaces.Method):
request: The single request message for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
messages of the RPC.
context: An RpcContext object.
context: A face_interfaces.RpcContext object.
control: A test_control.Control to control execution of this method.
Raises:
@ -150,8 +150,8 @@ class UnaryStreamTestMessages(object):
raise NotImplementedError()
class StreamUnaryTestMethod(interfaces.Method):
"""Like face_interfaces.EventStreamInValueOutMethod but with a control."""
class StreamUnaryTestMethodImplementation(interfaces.Method):
"""A controllable implementation of a stream-unary RPC method."""
__metaclass__ = abc.ABCMeta
@ -162,7 +162,7 @@ class StreamUnaryTestMethod(interfaces.Method):
Args:
response_callback: A callback to be called to accept the response message
of the RPC.
context: An RpcContext object.
context: A face_interfaces.RpcContext object.
control: A test_control.Control to control execution of this method.
Returns:
@ -214,8 +214,8 @@ class StreamUnaryTestMessages(object):
raise NotImplementedError()
class StreamStreamTestMethod(interfaces.Method):
"""Like face_interfaces.EventStreamInStreamOutMethod but with a control."""
class StreamStreamTestMethodImplementation(interfaces.Method):
"""A controllable implementation of a stream-stream RPC method."""
__metaclass__ = abc.ABCMeta
@ -226,7 +226,7 @@ class StreamStreamTestMethod(interfaces.Method):
Args:
response_consumer: A stream.Consumer to be called to accept the response
messages of the RPC.
context: An RpcContext object.
context: A face_interfaces.RpcContext object.
control: A test_control.Control to control execution of this method.
Returns:
@ -298,8 +298,8 @@ class TestService(object):
Returns:
A dict from method name to pair. The first element of the pair
is a UnaryUnaryTestMethod object and the second element is a sequence
of UnaryUnaryTestMethodMessages objects.
is a UnaryUnaryTestMethodImplementation object and the second element
is a sequence of UnaryUnaryTestMethodMessages objects.
"""
raise NotImplementedError()
@ -309,8 +309,8 @@ class TestService(object):
Returns:
A dict from method name to pair. The first element of the pair is a
UnaryStreamTestMethod object and the second element is a sequence of
UnaryStreamTestMethodMessages objects.
UnaryStreamTestMethodImplementation object and the second element is a
sequence of UnaryStreamTestMethodMessages objects.
"""
raise NotImplementedError()
@ -320,8 +320,8 @@ class TestService(object):
Returns:
A dict from method name to pair. The first element of the pair is a
StreamUnaryTestMethod object and the second element is a sequence of
StreamUnaryTestMethodMessages objects.
StreamUnaryTestMethodImplementation object and the second element is a
sequence of StreamUnaryTestMethodMessages objects.
"""
raise NotImplementedError()
@ -331,7 +331,7 @@ class TestService(object):
Returns:
A dict from method name to pair. The first element of the pair is a
StreamStreamTestMethod object and the second element is a sequence of
StreamStreamTestMethodMessages objects.
StreamStreamTestMethodImplementation object and the second element is a
sequence of StreamStreamTestMethodMessages objects.
"""
raise NotImplementedError()

@ -139,7 +139,7 @@ def _get_highest_trade_price(stock_reply_callback, control, active):
return StockRequestConsumer()
class GetLastTradePrice(service.UnaryUnaryTestMethod):
class GetLastTradePrice(service.UnaryUnaryTestMethodImplementation):
"""GetLastTradePrice for use in tests."""
def name(self):
@ -186,7 +186,7 @@ class GetLastTradePriceMessages(service.UnaryUnaryTestMessages):
test_case.assertEqual(_price(request.symbol), response.price)
class GetLastTradePriceMultiple(service.StreamStreamTestMethod):
class GetLastTradePriceMultiple(service.StreamStreamTestMethodImplementation):
"""GetLastTradePriceMultiple for use in tests."""
def name(self):
@ -238,7 +238,7 @@ class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages):
test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
class WatchFutureTrades(service.UnaryStreamTestMethod):
class WatchFutureTrades(service.UnaryStreamTestMethodImplementation):
"""WatchFutureTrades for use in tests."""
def name(self):
@ -288,7 +288,7 @@ class WatchFutureTradesMessages(service.UnaryStreamTestMessages):
test_case.assertEqual(base_price + index, response.price)
class GetHighestTradePrice(service.StreamUnaryTestMethod):
class GetHighestTradePrice(service.StreamUnaryTestMethodImplementation):
"""GetHighestTradePrice for use in tests."""
def name(self):

@ -46,55 +46,24 @@ class FaceTestCase(object):
@abc.abstractmethod
def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
self, name, methods, method_implementations,
multi_method_implementation):
"""Instantiates the Face Layer implementation under test.
Args:
name: The service name to be used in the test.
methods: A sequence of interfaces.Method objects describing the RPC
methods that will be called during the test.
inline_value_in_value_out_methods: A dictionary from string method names
to face_interfaces.InlineValueInValueOutMethod implementations of those
methods.
inline_value_in_stream_out_methods: A dictionary from string method names
to face_interfaces.InlineValueInStreamOutMethod implementations of those
methods.
inline_stream_in_value_out_methods: A dictionary from string method names
to face_interfaces.InlineStreamInValueOutMethod implementations of those
methods.
inline_stream_in_stream_out_methods: A dictionary from string method names
to face_interfaces.InlineStreamInStreamOutMethod implementations of
those methods.
event_value_in_value_out_methods: A dictionary from string method names
to face_interfaces.EventValueInValueOutMethod implementations of those
methods.
event_value_in_stream_out_methods: A dictionary from string method names
to face_interfaces.EventValueInStreamOutMethod implementations of those
methods.
event_stream_in_value_out_methods: A dictionary from string method names
to face_interfaces.EventStreamInValueOutMethod implementations of those
methods.
event_stream_in_stream_out_methods: A dictionary from string method names
to face_interfaces.EventStreamInStreamOutMethod implementations of those
methods.
multi_method: An face_interfaces.MultiMethod, or None.
method_implementations: A dictionary from string RPC method name to
face_interfaces.MethodImplementation object specifying
implementation of an RPC method.
multi_method_implementation: An face_interfaces.MultiMethodImplementation
or None.
Returns:
A sequence of length three the first element of which is a
face_interfaces.Server, the second element of which is a
face_interfaces.Stub, (both of which are backed by the given method
implementations), and the third element of which is an arbitrary memo
A sequence of length two the first element of which is a
face_interfaces.GenericStub (backed by the given method
implementations), and the second element of which is an arbitrary memo
object to be kept and passed to tearDownImplementation at the conclusion
of the test.
"""
@ -105,7 +74,7 @@ class FaceTestCase(object):
"""Destroys the Face layer implementation under test.
Args:
memo: The object from the third position of the return value of
memo: The object from the second position of the return value of
set_up_implementation.
"""
raise NotImplementedError()

@ -27,101 +27,44 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for the face layer of RPC Framework."""
"""Utilities for RPC framework's face layer."""
# stream is referenced from specification in this module.
from grpc.framework.face import interfaces
from grpc.framework.foundation import stream # pylint: disable=unused-import
class _InlineUnaryUnaryMethod(interfaces.InlineValueInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, context):
return self._behavior(request, context)
class _InlineUnaryStreamMethod(interfaces.InlineValueInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, context):
return self._behavior(request, context)
class _InlineStreamUnaryMethod(interfaces.InlineStreamInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request_iterator, context):
return self._behavior(request_iterator, context)
class _InlineStreamStreamMethod(interfaces.InlineStreamInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request_iterator, context):
return self._behavior(request_iterator, context)
import collections
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.face import interfaces
from grpc.framework.foundation import stream
class _EventUnaryUnaryMethod(interfaces.EventValueInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, response_callback, context):
return self._behavior(request, response_callback, context)
class _EventUnaryStreamMethod(interfaces.EventValueInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, response_consumer, context):
return self._behavior(request, response_consumer, context)
class _EventStreamUnaryMethod(interfaces.EventStreamInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, response_callback, context):
return self._behavior(response_callback, context)
class _EventStreamStreamMethod(interfaces.EventStreamInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, response_consumer, context):
return self._behavior(response_consumer, context)
class _MethodImplementation(
interfaces.MethodImplementation,
collections.namedtuple(
'_MethodImplementation',
['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
pass
def inline_unary_unary_method(behavior):
"""Creates an interfaces.InlineValueInValueOutMethod from a behavior.
def unary_unary_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
value that takes a request value and an interfaces.RpcContext object and
behavior: The implementation of a unary-unary RPC method as a callable value
that takes a request value and an interfaces.RpcContext object and
returns a response value.
Returns:
An interfaces.InlineValueInValueOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _InlineUnaryUnaryMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
None, None, None, None, None, None, None)
def inline_unary_stream_method(behavior):
"""Creates an interfaces.InlineValueInStreamOutMethod from a behavior.
def unary_stream_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
@ -129,13 +72,15 @@ def inline_unary_stream_method(behavior):
returns an iterator of response values.
Returns:
An interfaces.InlineValueInStreamOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _InlineUnaryStreamMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
behavior, None, None, None, None, None, None)
def inline_stream_unary_method(behavior):
"""Creates an interfaces.InlineStreamInValueOutMethod from a behavior.
def stream_unary_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
@ -143,13 +88,15 @@ def inline_stream_unary_method(behavior):
interfaces.RpcContext object and returns a response value.
Returns:
An interfaces.InlineStreamInValueOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _InlineStreamUnaryMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
behavior, None, None, None, None, None)
def inline_stream_stream_method(behavior):
"""Creates an interfaces.InlineStreamInStreamOutMethod from a behavior.
def stream_stream_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
@ -157,14 +104,15 @@ def inline_stream_stream_method(behavior):
interfaces.RpcContext object and returns an iterator of response values.
Returns:
An interfaces.InlineStreamInStreamOutMethod derived from the given
behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _InlineStreamStreamMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
None, behavior, None, None, None, None)
def event_unary_unary_method(behavior):
"""Creates an interfaces.EventValueInValueOutMethod from a behavior.
def unary_unary_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
@ -172,27 +120,31 @@ def event_unary_unary_method(behavior):
the response value of the RPC, and an interfaces.RpcContext.
Returns:
An interfaces.EventValueInValueOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _EventUnaryUnaryMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
None, None, behavior, None, None, None)
def event_unary_stream_method(behavior):
"""Creates an interfaces.EventValueInStreamOutMethod from a behavior.
def unary_stream_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
response values of the RPC, and an interfaces.RpcContext.
the response values of the RPC, and an interfaces.RpcContext.
Returns:
An interfaces.EventValueInStreamOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _EventUnaryStreamMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
None, None, None, behavior, None, None)
def event_stream_unary_method(behavior):
"""Creates an interfaces.EventStreamInValueOutMethod from a behavior.
def stream_unary_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
@ -201,13 +153,15 @@ def event_stream_unary_method(behavior):
which the request values of the RPC should be passed.
Returns:
An interfaces.EventStreamInValueOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _EventStreamUnaryMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
None, None, None, None, behavior, None)
def event_stream_stream_method(behavior):
"""Creates an interfaces.EventStreamInStreamOutMethod from a behavior.
def stream_stream_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
@ -216,6 +170,8 @@ def event_stream_stream_method(behavior):
which the request values of the RPC should be passed.
Returns:
An interfaces.EventStreamInStreamOutMethod derived from the given behavior.
An interfaces.MethodImplementation derived from the given behavior.
"""
return _EventStreamStreamMethod(behavior)
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
None, None, None, None, None, behavior)

Loading…
Cancel
Save