diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py index 475d780c950..2542eb6da4d 100644 --- a/src/python/src/grpc/_adapter/_face_test_case.py +++ b/src/python/src/grpc/_adapter/_face_test_case.py @@ -50,31 +50,12 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): """Provides abstract Face-layer tests a GRPC-backed implementation.""" def set_up_implementation( - self, - name, - methods, - inline_value_in_value_out_methods, - inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods, - event_value_in_value_out_methods, - event_value_in_stream_out_methods, - event_stream_in_value_out_methods, - event_stream_in_stream_out_methods, - multi_method): + self, name, methods, method_implementations, + multi_method_implementation): pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) servicer = face_implementations.servicer( - pool, - inline_value_in_value_out_methods=inline_value_in_value_out_methods, - inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, - event_value_in_value_out_methods=event_value_in_value_out_methods, - event_value_in_stream_out_methods=event_value_in_stream_out_methods, - event_stream_in_value_out_methods=event_stream_in_value_out_methods, - event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, - multi_method=multi_method) + pool, method_implementations, multi_method_implementation) serialization = serial.serialization(methods) @@ -96,9 +77,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): rear_link.join_fore_link(front) front.join_rear_link(rear_link) - server = face_implementations.server() - stub = face_implementations.stub(front, pool) - return server, stub, (rear_link, fore_link, front, back) + stub = face_implementations.generic_stub(front, pool) + return stub, (rear_link, fore_link, front, back) def tear_down_implementation(self, memo): rear_link, fore_link, front, back = memo diff --git a/src/python/src/grpc/early_adopter/_assembly_utilities.py b/src/python/src/grpc/early_adopter/_face_utilities.py similarity index 76% rename from src/python/src/grpc/early_adopter/_assembly_utilities.py rename to src/python/src/grpc/early_adopter/_face_utilities.py index facfc2bf0e6..2cf576018d4 100644 --- a/src/python/src/grpc/early_adopter/_assembly_utilities.py +++ b/src/python/src/grpc/early_adopter/_face_utilities.py @@ -30,23 +30,20 @@ import abc import collections -# assembly_interfaces is referenced from specification in this module. -from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import -from grpc.framework.assembly import utilities as assembly_utilities +# face_interfaces is referenced from specification in this module. +from grpc.framework.common import cardinality +from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from grpc.framework.face import utilities as face_utilities from grpc.early_adopter import _reexport from grpc.early_adopter import interfaces -# TODO(issue 726): Kill the "implementations" attribute of this in favor -# of the same-information-less-bogusly-represented "cardinalities". class InvocationBreakdown(object): """An intermediate representation of invocation-side views of RPC methods. Attributes: cardinalities: A dictionary from RPC method name to interfaces.Cardinality value. - implementations: A dictionary from RPC method name to - assembly_interfaces.MethodImplementation describing the method. request_serializers: A dictionary from RPC method name to callable behavior to be used serializing request values for the RPC. response_deserializers: A dictionary from RPC method name to callable @@ -59,8 +56,7 @@ class _EasyInvocationBreakdown( InvocationBreakdown, collections.namedtuple( '_EasyInvocationBreakdown', - ('cardinalities', 'implementations', 'request_serializers', - 'response_deserializers'))): + ('cardinalities', 'request_serializers', 'response_deserializers'))): pass @@ -68,8 +64,8 @@ class ServiceBreakdown(object): """An intermediate representation of service-side views of RPC methods. Attributes: - implementations: A dictionary from RPC method name - assembly_interfaces.MethodImplementation implementing the RPC method. + implementations: A dictionary from RPC method name to + face_interfaces.MethodImplementation implementing the RPC method. request_deserializers: A dictionary from RPC method name to callable behavior to be used deserializing request values for the RPC. response_serializers: A dictionary from RPC method name to callable @@ -97,25 +93,14 @@ def break_down_invocation(method_descriptions): An InvocationBreakdown corresponding to the given method descriptions. """ cardinalities = {} - implementations = {} request_serializers = {} response_deserializers = {} for name, method_description in method_descriptions.iteritems(): - cardinality = method_description.cardinality() - cardinalities[name] = cardinality - if cardinality is interfaces.Cardinality.UNARY_UNARY: - implementations[name] = assembly_utilities.unary_unary_inline(None) - elif cardinality is interfaces.Cardinality.UNARY_STREAM: - implementations[name] = assembly_utilities.unary_stream_inline(None) - elif cardinality is interfaces.Cardinality.STREAM_UNARY: - implementations[name] = assembly_utilities.stream_unary_inline(None) - elif cardinality is interfaces.Cardinality.STREAM_STREAM: - implementations[name] = assembly_utilities.stream_stream_inline(None) + cardinalities[name] = method_description.cardinality() request_serializers[name] = method_description.serialize_request response_deserializers[name] = method_description.deserialize_response return _EasyInvocationBreakdown( - cardinalities, implementations, request_serializers, - response_deserializers) + cardinalities, request_serializers, response_deserializers) def break_down_service(method_descriptions): @@ -139,28 +124,28 @@ def break_down_service(method_descriptions): service_behavior=method_description.service_unary_unary): return service_behavior( request, _reexport.rpc_context(face_rpc_context)) - implementations[name] = assembly_utilities.unary_unary_inline(service) + implementations[name] = face_utilities.unary_unary_inline(service) elif cardinality is interfaces.Cardinality.UNARY_STREAM: def service( request, face_rpc_context, service_behavior=method_description.service_unary_stream): return service_behavior( request, _reexport.rpc_context(face_rpc_context)) - implementations[name] = assembly_utilities.unary_stream_inline(service) + implementations[name] = face_utilities.unary_stream_inline(service) elif cardinality is interfaces.Cardinality.STREAM_UNARY: def service( request_iterator, face_rpc_context, service_behavior=method_description.service_stream_unary): return service_behavior( request_iterator, _reexport.rpc_context(face_rpc_context)) - implementations[name] = assembly_utilities.stream_unary_inline(service) + implementations[name] = face_utilities.stream_unary_inline(service) elif cardinality is interfaces.Cardinality.STREAM_STREAM: def service( request_iterator, face_rpc_context, service_behavior=method_description.service_stream_stream): return service_behavior( request_iterator, _reexport.rpc_context(face_rpc_context)) - implementations[name] = assembly_utilities.stream_stream_inline(service) + implementations[name] = face_utilities.stream_stream_inline(service) request_deserializers[name] = method_description.deserialize_request response_serializers[name] = method_description.serialize_response diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py index 35f4e85a728..3fed8099f6f 100644 --- a/src/python/src/grpc/early_adopter/_reexport.py +++ b/src/python/src/grpc/early_adopter/_reexport.py @@ -27,12 +27,20 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from grpc.framework.common import cardinality from grpc.framework.face import exceptions as face_exceptions from grpc.framework.face import interfaces as face_interfaces from grpc.framework.foundation import future from grpc.early_adopter import exceptions from grpc.early_adopter import interfaces +_EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY = { + interfaces.Cardinality.UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, + interfaces.Cardinality.UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, + interfaces.Cardinality.STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, + interfaces.Cardinality.STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, +} + _ABORTION_REEXPORT = { face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED, face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED, @@ -142,28 +150,28 @@ class _RpcContext(interfaces.RpcContext): class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync): - def __init__(self, face_unary_unary_sync_async): - self._underlying = face_unary_unary_sync_async + def __init__(self, face_unary_unary_multi_callable): + self._underlying = face_unary_unary_multi_callable def __call__(self, request, timeout): return _call_reexporting_errors( self._underlying, request, timeout) def async(self, request, timeout): - return _ReexportedFuture(self._underlying.async(request, timeout)) + return _ReexportedFuture(self._underlying.future(request, timeout)) class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): - def __init__(self, face_stream_unary_sync_async): - self._underlying = face_stream_unary_sync_async + def __init__(self, face_stream_unary_multi_callable): + self._underlying = face_stream_unary_multi_callable def __call__(self, request_iterator, timeout): return _call_reexporting_errors( self._underlying, request_iterator, timeout) def async(self, request_iterator, timeout): - return _ReexportedFuture(self._underlying.async(request_iterator, timeout)) + return _ReexportedFuture(self._underlying.future(request_iterator, timeout)) class _Stub(interfaces.Stub): @@ -182,31 +190,40 @@ class _Stub(interfaces.Stub): def __getattr__(self, attr): underlying_attr = self._assembly_stub.__getattr__(attr) - cardinality = self._cardinalities.get(attr) + method_cardinality = self._cardinalities.get(attr) # TODO(nathaniel): unify this trick with its other occurrence in the code. - if cardinality is None: - for name, cardinality in self._cardinalities.iteritems(): + if method_cardinality is None: + for name, method_cardinality in self._cardinalities.iteritems(): last_slash_index = name.rfind('/') if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: break else: raise AttributeError(attr) - if cardinality is interfaces.Cardinality.UNARY_UNARY: + if method_cardinality is interfaces.Cardinality.UNARY_UNARY: return _UnaryUnarySyncAsync(underlying_attr) - elif cardinality is interfaces.Cardinality.UNARY_STREAM: + elif method_cardinality is interfaces.Cardinality.UNARY_STREAM: return lambda request, timeout: _CancellableIterator( underlying_attr(request, timeout)) - elif cardinality is interfaces.Cardinality.STREAM_UNARY: + elif method_cardinality is interfaces.Cardinality.STREAM_UNARY: return _StreamUnarySyncAsync(underlying_attr) - elif cardinality is interfaces.Cardinality.STREAM_STREAM: + elif method_cardinality is interfaces.Cardinality.STREAM_STREAM: return lambda request_iterator, timeout: _CancellableIterator( underlying_attr(request_iterator, timeout)) else: raise AttributeError(attr) + +def common_cardinalities(early_adopter_cardinalities): + common_cardinalities = {} + for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems(): + common_cardinalities[name] = _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY[ + early_adopter_cardinality] + return common_cardinalities + + def rpc_context(face_rpc_context): return _RpcContext(face_rpc_context) -def stub(assembly_stub, cardinalities): - return _Stub(assembly_stub, cardinalities) +def stub(face_stub, cardinalities): + return _Stub(face_stub, cardinalities) diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index 87ea18d6662..b46f94e9fbf 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -33,7 +33,7 @@ import threading from grpc._adapter import fore as _fore from grpc._adapter import rear as _rear -from grpc.early_adopter import _assembly_utilities +from grpc.early_adopter import _face_utilities from grpc.early_adopter import _reexport from grpc.early_adopter import interfaces from grpc.framework.assembly import implementations as _assembly_implementations @@ -95,12 +95,13 @@ class _Server(interfaces.Server): def _build_stub(breakdown, activated_rear_link): assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub( - breakdown.implementations, activated_rear_link) + _reexport.common_cardinalities(breakdown.cardinalities), + activated_rear_link) return _reexport.stub(assembly_stub, breakdown.cardinalities) def _build_server(methods, port, private_key, certificate_chain): - breakdown = _assembly_utilities.break_down_service(methods) + breakdown = _face_utilities.break_down_service(methods) return _Server(breakdown, port, private_key, certificate_chain) @@ -117,7 +118,7 @@ def insecure_stub(methods, host, port): Returns: An interfaces.Stub affording RPC invocation. """ - breakdown = _assembly_utilities.break_down_invocation(methods) + breakdown = _face_utilities.break_down_invocation(methods) activated_rear_link = _rear.activated_rear_link( host, port, breakdown.request_serializers, breakdown.response_deserializers) @@ -147,7 +148,7 @@ def secure_stub( Returns: An interfaces.Stub affording RPC invocation. """ - breakdown = _assembly_utilities.break_down_invocation(methods) + breakdown = _face_utilities.break_down_invocation(methods) activated_rear_link = _rear.secure_activated_rear_link( host, port, breakdown.request_serializers, breakdown.response_deserializers, root_certificates, private_key, diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py index f7166ed99d1..24afcbeb6d6 100644 --- a/src/python/src/grpc/framework/assembly/implementations.py +++ b/src/python/src/grpc/framework/assembly/implementations.py @@ -66,7 +66,7 @@ class _FaceStub(object): self._rear_link.start() self._rear_link.join_fore_link(self._front) self._front.join_rear_link(self._rear_link) - self._under_stub = face_implementations.stub(self._front, self._pool) + self._under_stub = face_implementations.generic_stub(self._front, self._pool) def __exit__(self, exc_type, exc_val, exc_tb): with self._lock: @@ -86,18 +86,18 @@ class _FaceStub(object): return getattr(self._under_stub, attr) -def _behaviors(implementations, front, pool): +def _behaviors(method_cardinalities, front, pool): behaviors = {} - stub = face_implementations.stub(front, pool) - for name, implementation in implementations.iteritems(): - if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: - behaviors[name] = stub.unary_unary_sync_async(name) - elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: + stub = face_implementations.generic_stub(front, pool) + for name, method_cardinality in method_cardinalities.iteritems(): + if method_cardinality is cardinality.Cardinality.UNARY_UNARY: + behaviors[name] = stub.unary_unary_multi_callable(name) + elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: behaviors[name] = lambda request, context, bound_name=name: ( stub.inline_value_in_stream_out(bound_name, request, context)) - elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: - behaviors[name] = stub.stream_unary_sync_async(name) - elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: + elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: + behaviors[name] = stub.stream_unary_multi_callable(name) + elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: behaviors[name] = lambda request_iterator, context, bound_name=name: ( stub.inline_stream_in_stream_out( bound_name, request_iterator, context)) @@ -106,8 +106,8 @@ def _behaviors(implementations, front, pool): class _DynamicInlineStub(object): - def __init__(self, implementations, rear_link): - self._implementations = implementations + def __init__(self, cardinalities, rear_link): + self._cardinalities = cardinalities self._rear_link = rear_link self._lock = threading.Lock() self._pool = None @@ -123,7 +123,7 @@ class _DynamicInlineStub(object): self._rear_link.join_fore_link(self._front) self._front.join_rear_link(self._rear_link) self._behaviors = _behaviors( - self._implementations, self._front, self._pool) + self._cardinalities, self._front, self._pool) return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -151,58 +151,6 @@ class _DynamicInlineStub(object): return behavior -def _servicer(implementations, pool): - inline_value_in_value_out_methods = {} - inline_value_in_stream_out_methods = {} - inline_stream_in_value_out_methods = {} - inline_stream_in_stream_out_methods = {} - event_value_in_value_out_methods = {} - event_value_in_stream_out_methods = {} - event_stream_in_value_out_methods = {} - event_stream_in_stream_out_methods = {} - - for name, implementation in implementations.iteritems(): - if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: - if implementation.style is style.Service.INLINE: - inline_value_in_value_out_methods[name] = ( - face_utilities.inline_unary_unary_method(implementation.unary_unary_inline)) - elif implementation.style is style.Service.EVENT: - event_value_in_value_out_methods[name] = ( - face_utilities.event_unary_unary_method(implementation.unary_unary_event)) - elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: - if implementation.style is style.Service.INLINE: - inline_value_in_stream_out_methods[name] = ( - face_utilities.inline_unary_stream_method(implementation.unary_stream_inline)) - elif implementation.style is style.Service.EVENT: - event_value_in_stream_out_methods[name] = ( - face_utilities.event_unary_stream_method(implementation.unary_stream_event)) - if implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: - if implementation.style is style.Service.INLINE: - inline_stream_in_value_out_methods[name] = ( - face_utilities.inline_stream_unary_method(implementation.stream_unary_inline)) - elif implementation.style is style.Service.EVENT: - event_stream_in_value_out_methods[name] = ( - face_utilities.event_stream_unary_method(implementation.stream_unary_event)) - elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: - if implementation.style is style.Service.INLINE: - inline_stream_in_stream_out_methods[name] = ( - face_utilities.inline_stream_stream_method(implementation.stream_stream_inline)) - elif implementation.style is style.Service.EVENT: - event_stream_in_stream_out_methods[name] = ( - face_utilities.event_stream_stream_method(implementation.stream_stream_event)) - - return face_implementations.servicer( - pool, - inline_value_in_value_out_methods=inline_value_in_value_out_methods, - inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, - event_value_in_value_out_methods=event_value_in_value_out_methods, - event_value_in_stream_out_methods=event_value_in_stream_out_methods, - event_stream_in_value_out_methods=event_stream_in_value_out_methods, - event_stream_in_stream_out_methods=event_stream_in_stream_out_methods) - - class _ServiceAssembly(interfaces.Server): def __init__(self, implementations, fore_link): @@ -215,7 +163,8 @@ class _ServiceAssembly(interfaces.Server): def _start(self): with self._lock: self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - servicer = _servicer(self._implementations, self._pool) + servicer = face_implementations.servicer( + self._pool, self._implementations, None) self._back = tickets_implementations.back( servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, _ONE_DAY_IN_SECONDS) @@ -251,7 +200,7 @@ class _ServiceAssembly(interfaces.Server): def assemble_face_stub(activated_rear_link): - """Assembles a face_interfaces.Stub. + """Assembles a face_interfaces.GenericStub. The returned object is a context manager and may only be used in context to invoke RPCs. @@ -262,12 +211,12 @@ def assemble_face_stub(activated_rear_link): when passed to this method. Returns: - A face_interfaces.Stub on which, in context, RPCs can be invoked. + A face_interfaces.GenericStub on which, in context, RPCs can be invoked. """ return _FaceStub(activated_rear_link) -def assemble_dynamic_inline_stub(implementations, activated_rear_link): +def assemble_dynamic_inline_stub(cardinalities, activated_rear_link): """Assembles a stub with method names for attributes. The returned object is a context manager and may only be used in context to @@ -276,29 +225,27 @@ def assemble_dynamic_inline_stub(implementations, activated_rear_link): The returned object, when used in context, will respond to attribute access as follows: if the requested attribute is the name of a unary-unary RPC method, the value of the attribute will be a - face_interfaces.UnaryUnarySyncAsync with which to invoke the RPC method. If - the requested attribute is the name of a unary-stream RPC method, the value - of the attribute will be a callable with the semantics of - face_interfaces.Stub.inline_value_in_stream_out, minus the "name" parameter, + face_interfaces.UnaryUnaryMultiCallable with which to invoke the RPC method. + If the requested attribute is the name of a unary-stream RPC method, the + value of the attribute will be a face_interfaces.UnaryStreamMultiCallable with which to invoke the RPC method. If the requested attribute is the name of a stream-unary RPC method, the value of the attribute will be a - face_interfaces.StreamUnarySyncAsync with which to invoke the RPC method. If - the requested attribute is the name of a stream-stream RPC method, the value - of the attribute will be a callable with the semantics of - face_interfaces.Stub.inline_stream_in_stream_out, minus the "name" parameter, + face_interfaces.StreamUnaryMultiCallable with which to invoke the RPC method. + If the requested attribute is the name of a stream-stream RPC method, the + value of the attribute will be a face_interfaces.StreamStreamMultiCallable with which to invoke the RPC method. Args: - implementations: A dictionary from RPC method name to - interfaces.MethodImplementation. + cardinalities: A dictionary from RPC method name to cardinality.Cardinality + value identifying the cardinality of the named RPC method. activated_rear_link: An object that is both a tickets_interfaces.RearLink and an activated.Activated. The object should be in the inactive state when passed to this method. Returns: - A stub on which, in context, RPCs can be invoked. + A face_interfaces.DynamicStub on which, in context, RPCs can be invoked. """ - return _DynamicInlineStub(implementations, activated_rear_link) + return _DynamicInlineStub(cardinalities, activated_rear_link) def assemble_service(implementations, activated_fore_link): @@ -306,7 +253,7 @@ def assemble_service(implementations, activated_fore_link): Args: implementations: A dictionary from RPC method name to - interfaces.MethodImplementation. + face_interfaces.MethodImplementation. activated_fore_link: An object that is both a tickets_interfaces.ForeLink and an activated.Activated. The object should be in the inactive state when passed to this method. diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py index 74dc02ed83d..5540edff7ad 100644 --- a/src/python/src/grpc/framework/assembly/implementations_test.py +++ b/src/python/src/grpc/framework/assembly/implementations_test.py @@ -35,11 +35,11 @@ import threading import unittest from grpc.framework.assembly import implementations -from grpc.framework.assembly import utilities from grpc.framework.base import interfaces from grpc.framework.base.packets import packets as tickets from grpc.framework.base.packets import interfaces as tickets_interfaces from grpc.framework.base.packets import null +from grpc.framework.face import utilities as face_utilities from grpc.framework.foundation import logging_pool from grpc._junkdrawer import math_pb2 @@ -81,12 +81,16 @@ def _sum(request_iterator, unused_context): _IMPLEMENTATIONS = { - DIV: utilities.unary_unary_inline(_div), - DIV_MANY: utilities.stream_stream_inline(_div_many), - FIB: utilities.unary_stream_inline(_fib), - SUM: utilities.stream_unary_inline(_sum), + DIV: face_utilities.unary_unary_inline(_div), + DIV_MANY: face_utilities.stream_stream_inline(_div_many), + FIB: face_utilities.unary_stream_inline(_fib), + SUM: face_utilities.stream_unary_inline(_sum), } +_CARDINALITIES = { + name: implementation.cardinality + for name, implementation in _IMPLEMENTATIONS.iteritems()} + _TIMEOUT = 10 @@ -170,8 +174,8 @@ class FaceStubTest(unittest.TestCase): face_stub = implementations.assemble_face_stub(pipe) with service, face_stub: - sync_async = face_stub.stream_unary_sync_async(SUM) - response_future = sync_async.async( + multi_callable = face_stub.stream_unary_multi_callable(SUM) + response_future = multi_callable.future( (math_pb2.Num(num=index) for index in range(stream_length)), _TIMEOUT) self.assertEqual( @@ -214,7 +218,7 @@ class DynamicInlineStubTest(unittest.TestCase): pipe = PipeLink() service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) dynamic_stub = implementations.assemble_dynamic_inline_stub( - _IMPLEMENTATIONS, pipe) + _CARDINALITIES, pipe) service.start() with dynamic_stub: @@ -229,7 +233,7 @@ class DynamicInlineStubTest(unittest.TestCase): pipe = PipeLink() service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) dynamic_stub = implementations.assemble_dynamic_inline_stub( - _IMPLEMENTATIONS, pipe) + _CARDINALITIES, pipe) with service, dynamic_stub: response_iterator = dynamic_stub.Fib( @@ -244,10 +248,10 @@ class DynamicInlineStubTest(unittest.TestCase): pipe = PipeLink() service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) dynamic_stub = implementations.assemble_dynamic_inline_stub( - _IMPLEMENTATIONS, pipe) + _CARDINALITIES, pipe) with service, dynamic_stub: - response_future = dynamic_stub.Sum.async( + response_future = dynamic_stub.Sum.future( (math_pb2.Num(num=index) for index in range(stream_length)), _TIMEOUT) self.assertEqual( @@ -261,7 +265,7 @@ class DynamicInlineStubTest(unittest.TestCase): pipe = PipeLink() service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) dynamic_stub = implementations.assemble_dynamic_inline_stub( - _IMPLEMENTATIONS, pipe) + _CARDINALITIES, pipe) with service, dynamic_stub: response_iterator = dynamic_stub.DivMany( diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py index d1a6aad29e7..c469dc4fd2b 100644 --- a/src/python/src/grpc/framework/assembly/interfaces.py +++ b/src/python/src/grpc/framework/assembly/interfaces.py @@ -33,63 +33,7 @@ import abc -# cardinality, style, and stream are referenced from specification in this -# module. -from grpc.framework.common import cardinality # pylint: disable=unused-import -from grpc.framework.common import style # pylint: disable=unused-import from grpc.framework.foundation import activated -from grpc.framework.foundation import stream # pylint: disable=unused-import - - -class MethodImplementation(object): - """A sum type that describes an RPC method implementation. - - Attributes: - cardinality: A cardinality.Cardinality value. - style: A style.Service value. - unary_unary_inline: The implementation of the RPC method as a callable - value that takes a request value and a face_interfaces.RpcContext object - and returns a response value. Only non-None if cardinality is - cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE. - unary_stream_inline: The implementation of the RPC method as a callable - value that takes a request value and a face_interfaces.RpcContext object - and returns an iterator of response values. Only non-None if cardinality - is cardinality.Cardinality.UNARY_STREAM and style is - style.Service.INLINE. - stream_unary_inline: The implementation of the RPC method as a callable - value that takes an iterator of request values and a - face_interfaces.RpcContext object and returns a response value. Only - non-None if cardinality is cardinality.Cardinality.STREAM_UNARY and style - is style.Service.INLINE. - stream_stream_inline: The implementation of the RPC method as a callable - value that takes an iterator of request values and a - face_interfaces.RpcContext object and returns an iterator of response - values. Only non-None if cardinality is - cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE. - unary_unary_event: The implementation of the RPC method as a callable value - that takes a request value, a response callback to which to pass the - response value of the RPC, and a face_interfaces.RpcContext. Only - non-None if cardinality is cardinality.Cardinality.UNARY_UNARY and style - is style.Service.EVENT. - unary_stream_event: The implementation of the RPC method as a callable - value that takes a request value, a stream.Consumer to which to pass the - the response values of the RPC, and a face_interfaces.RpcContext. Only - non-None if cardinality is cardinality.Cardinality.UNARY_STREAM and style - is style.Service.EVENT. - stream_unary_event: The implementation of the RPC method as a callable - value that takes a response callback to which to pass the response value - of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer - to which the request values of the RPC should be passed. Only non-None if - cardinality is cardinality.Cardinality.STREAM_UNARY and style is - style.Service.EVENT. - stream_stream_event: The implementation of the RPC method as a callable - value that takes a stream.Consumer to which to pass the response values - of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer - to which the request values of the RPC should be passed. Only non-None if - cardinality is cardinality.Cardinality.STREAM_STREAM and style is - style.Service.EVENT. - """ - __metaclass__ = abc.ABCMeta class Server(activated.Activated): diff --git a/src/python/src/grpc/framework/assembly/utilities.py b/src/python/src/grpc/framework/assembly/utilities.py deleted file mode 100644 index 80e7f59c03c..00000000000 --- a/src/python/src/grpc/framework/assembly/utilities.py +++ /dev/null @@ -1,179 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Utilities for assembling RPC framework values.""" - -import collections - -from grpc.framework.assembly import interfaces -from grpc.framework.common import cardinality -from grpc.framework.common import style -from grpc.framework.face import interfaces as face_interfaces -from grpc.framework.foundation import stream - - -class _MethodImplementation( - interfaces.MethodImplementation, - collections.namedtuple( - '_MethodImplementation', - ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline', - 'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event', - 'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])): - pass - - -def unary_unary_inline(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a unary-unary RPC method as a callable value - that takes a request value and a face_interfaces.RpcContext object and - returns a response value. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior, - None, None, None, None, None, None, None) - - -def unary_stream_inline(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a unary-stream RPC method as a callable - value that takes a request value and a face_interfaces.RpcContext object - and returns an iterator of response values. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None, - behavior, None, None, None, None, None, None) - - -def stream_unary_inline(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a stream-unary RPC method as a callable - value that takes an iterator of request values and a - face_interfaces.RpcContext object and returns a response value. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None, - behavior, None, None, None, None, None) - - -def stream_stream_inline(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a stream-stream RPC method as a callable - value that takes an iterator of request values and a - face_interfaces.RpcContext object and returns an iterator of response - values. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None, - None, behavior, None, None, None, None) - - -def unary_unary_event(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a unary-unary RPC method as a callable - value that takes a request value, a response callback to which to pass - the response value of the RPC, and a face_interfaces.RpcContext. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None, - None, None, behavior, None, None, None) - - -def unary_stream_event(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a unary-stream RPC method as a callable - value that takes a request value, a stream.Consumer to which to pass the - the response values of the RPC, and a face_interfaces.RpcContext. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None, - None, None, None, behavior, None, None) - - -def stream_unary_event(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a stream-unary RPC method as a callable - value that takes a response callback to which to pass the response value - of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer - to which the request values of the RPC should be passed. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None, - None, None, None, None, behavior, None) - - -def stream_stream_event(behavior): - """Creates an interfaces.MethodImplementation for the given behavior. - - Args: - behavior: The implementation of a stream-stream RPC method as a callable - value that takes a stream.Consumer to which to pass the response values - of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer - to which the request values of the RPC should be passed. - - Returns: - An interfaces.MethodImplementation derived from the given behavior. - """ - return _MethodImplementation( - cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None, - None, None, None, None, None, behavior) diff --git a/src/python/src/grpc/framework/face/_service.py b/src/python/src/grpc/framework/face/_service.py index 26bde129687..cdf413356ad 100644 --- a/src/python/src/grpc/framework/face/_service.py +++ b/src/python/src/grpc/framework/face/_service.py @@ -105,15 +105,14 @@ def adapt_inline_value_in_value_out(method): def adaptation(response_consumer, operation_context): rpc_context = _control.RpcContext(operation_context) return stream_util.TransformingConsumer( - lambda request: method.service(request, rpc_context), response_consumer) + lambda request: method(request, rpc_context), response_consumer) return adaptation def adapt_inline_value_in_stream_out(method): def adaptation(response_consumer, operation_context): rpc_context = _control.RpcContext(operation_context) - return _ValueInStreamOutConsumer( - method.service, rpc_context, response_consumer) + return _ValueInStreamOutConsumer(method, rpc_context, response_consumer) return adaptation @@ -123,7 +122,7 @@ def adapt_inline_stream_in_value_out(method, pool): operation_context.add_termination_callback(rendezvous.set_outcome) def in_pool_thread(): response_consumer.consume_and_terminate( - method.service(rendezvous, _control.RpcContext(operation_context))) + method(rendezvous, _control.RpcContext(operation_context))) pool.submit(_pool_wrap(in_pool_thread, operation_context)) return rendezvous return adaptation @@ -149,7 +148,7 @@ def adapt_inline_stream_in_stream_out(method, pool): operation_context.add_termination_callback(rendezvous.set_outcome) def in_pool_thread(): _control.pipe_iterator_to_consumer( - method.service(rendezvous, _control.RpcContext(operation_context)), + method(rendezvous, _control.RpcContext(operation_context)), response_consumer, operation_context.is_active, True) pool.submit(_pool_wrap(in_pool_thread, operation_context)) return rendezvous @@ -159,7 +158,7 @@ def adapt_inline_stream_in_stream_out(method, pool): def adapt_event_value_in_value_out(method): def adaptation(response_consumer, operation_context): def on_payload(payload): - method.service( + method( payload, response_consumer.consume_and_terminate, _control.RpcContext(operation_context)) return _control.UnaryConsumer(on_payload) @@ -169,7 +168,7 @@ def adapt_event_value_in_value_out(method): def adapt_event_value_in_stream_out(method): def adaptation(response_consumer, operation_context): def on_payload(payload): - method.service( + method( payload, response_consumer, _control.RpcContext(operation_context)) return _control.UnaryConsumer(on_payload) return adaptation @@ -178,12 +177,11 @@ def adapt_event_value_in_stream_out(method): def adapt_event_stream_in_value_out(method): def adaptation(response_consumer, operation_context): rpc_context = _control.RpcContext(operation_context) - return method.service(response_consumer.consume_and_terminate, rpc_context) + return method(response_consumer.consume_and_terminate, rpc_context) return adaptation def adapt_event_stream_in_stream_out(method): def adaptation(response_consumer, operation_context): - return method.service( - response_consumer, _control.RpcContext(operation_context)) + return method(response_consumer, _control.RpcContext(operation_context)) return adaptation diff --git a/src/python/src/grpc/framework/face/_test_case.py b/src/python/src/grpc/framework/face/_test_case.py index a4e17c464ce..b3a012db001 100644 --- a/src/python/src/grpc/framework/face/_test_case.py +++ b/src/python/src/grpc/framework/face/_test_case.py @@ -42,37 +42,17 @@ class FaceTestCase(test_case.FaceTestCase): """Provides abstract Face-layer tests an in-memory implementation.""" def set_up_implementation( - self, - name, - methods, - inline_value_in_value_out_methods, - inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods, - event_value_in_value_out_methods, - event_value_in_stream_out_methods, - event_stream_in_value_out_methods, - event_stream_in_stream_out_methods, - multi_method): + self, name, methods, method_implementations, + multi_method_implementation): servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) servicer = implementations.servicer( - servicer_pool, - inline_value_in_value_out_methods=inline_value_in_value_out_methods, - inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, - event_value_in_value_out_methods=event_value_in_value_out_methods, - event_value_in_stream_out_methods=event_value_in_stream_out_methods, - event_stream_in_value_out_methods=event_stream_in_value_out_methods, - event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, - multi_method=multi_method) + servicer_pool, method_implementations, multi_method_implementation) linked_pair = base_util.linked_pair(servicer, _TIMEOUT) - server = implementations.server() - stub = implementations.stub(linked_pair.front, stub_pool) - return server, stub, (servicer_pool, stub_pool, linked_pair) + stub = implementations.generic_stub(linked_pair.front, stub_pool) + return stub, (servicer_pool, stub_pool, linked_pair) def tear_down_implementation(self, memo): servicer_pool, stub_pool, linked_pair = memo diff --git a/src/python/src/grpc/framework/face/implementations.py b/src/python/src/grpc/framework/face/implementations.py index 86948b386fa..e8d91a3c91c 100644 --- a/src/python/src/grpc/framework/face/implementations.py +++ b/src/python/src/grpc/framework/face/implementations.py @@ -29,6 +29,8 @@ """Entry points into the Face layer of RPC Framework.""" +from grpc.framework.common import cardinality +from grpc.framework.common import style from grpc.framework.base import exceptions as _base_exceptions from grpc.framework.base import interfaces as base_interfaces from grpc.framework.face import _calls @@ -56,7 +58,7 @@ class _BaseServicer(base_interfaces.Servicer): raise _base_exceptions.NoSuchMethodError() -class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync): +class _UnaryUnaryMultiCallable(interfaces.UnaryUnaryMultiCallable): def __init__(self, front, name): self._front = front @@ -66,12 +68,33 @@ class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync): return _calls.blocking_value_in_value_out( self._front, self._name, request, timeout, 'unused trace ID') - def async(self, request, timeout): + def future(self, request, timeout): return _calls.future_value_in_value_out( self._front, self._name, request, timeout, 'unused trace ID') + def event(self, request, response_callback, abortion_callback, timeout): + return _calls.event_value_in_value_out( + self._front, self._name, request, response_callback, abortion_callback, + timeout, 'unused trace ID') + + +class _UnaryStreamMultiCallable(interfaces.UnaryStreamMultiCallable): + + def __init__(self, front, name): + self._front = front + self._name = name -class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): + def __call__(self, request, timeout): + return _calls.inline_value_in_stream_out( + self._front, self._name, request, timeout, 'unused trace ID') + + def event(self, request, response_consumer, abortion_callback, timeout): + return _calls.event_value_in_stream_out( + self._front, self._name, request, response_consumer, abortion_callback, + timeout, 'unused trace ID') + + +class _StreamUnaryMultiCallable(interfaces.StreamUnaryMultiCallable): def __init__(self, front, name, pool): self._front = front @@ -82,18 +105,37 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): return _calls.blocking_stream_in_value_out( self._front, self._name, request_iterator, timeout, 'unused trace ID') - def async(self, request_iterator, timeout): + def future(self, request_iterator, timeout): return _calls.future_stream_in_value_out( self._front, self._name, request_iterator, timeout, 'unused trace ID', self._pool) + def event(self, response_callback, abortion_callback, timeout): + return _calls.event_stream_in_value_out( + self._front, self._name, response_callback, abortion_callback, timeout, + 'unused trace ID') -class _Server(interfaces.Server): - """An interfaces.Server implementation.""" +class _StreamStreamMultiCallable(interfaces.StreamStreamMultiCallable): -class _Stub(interfaces.Stub): - """An interfaces.Stub implementation.""" + def __init__(self, front, name, pool): + self._front = front + self._name = name + self._pool = pool + + def __call__(self, request_iterator, timeout): + return _calls.inline_stream_in_stream_out( + self._front, self._name, request_iterator, timeout, 'unused trace ID', + self._pool) + + def event(self, response_consumer, abortion_callback, timeout): + return _calls.event_stream_in_stream_out( + self._front, self._name, response_consumer, abortion_callback, timeout, + 'unused trace ID') + + +class _GenericStub(interfaces.GenericStub): + """An interfaces.GenericStub implementation.""" def __init__(self, front, pool): self._front = front @@ -149,136 +191,128 @@ class _Stub(interfaces.Stub): self._front, name, response_consumer, abortion_callback, timeout, 'unused trace ID') - def unary_unary_sync_async(self, name): - return _UnaryUnarySyncAsync(self._front, name) - - def stream_unary_sync_async(self, name): - return _StreamUnarySyncAsync(self._front, name, self._pool) - - -def _aggregate_methods( - pool, - inline_value_in_value_out_methods, - inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods, - event_value_in_value_out_methods, - event_value_in_stream_out_methods, - event_stream_in_value_out_methods, - event_stream_in_stream_out_methods): - """Aggregates methods coded in according to different interfaces.""" - methods = {} - - def adapt_unpooled_methods(adapted_methods, unadapted_methods, adaptation): - if unadapted_methods is not None: - for name, unadapted_method in unadapted_methods.iteritems(): - adapted_methods[name] = adaptation(unadapted_method) - - def adapt_pooled_methods(adapted_methods, unadapted_methods, adaptation): - if unadapted_methods is not None: - for name, unadapted_method in unadapted_methods.iteritems(): - adapted_methods[name] = adaptation(unadapted_method, pool) - - adapt_unpooled_methods( - methods, inline_value_in_value_out_methods, - _service.adapt_inline_value_in_value_out) - adapt_unpooled_methods( - methods, inline_value_in_stream_out_methods, - _service.adapt_inline_value_in_stream_out) - adapt_pooled_methods( - methods, inline_stream_in_value_out_methods, - _service.adapt_inline_stream_in_value_out) - adapt_pooled_methods( - methods, inline_stream_in_stream_out_methods, - _service.adapt_inline_stream_in_stream_out) - adapt_unpooled_methods( - methods, event_value_in_value_out_methods, - _service.adapt_event_value_in_value_out) - adapt_unpooled_methods( - methods, event_value_in_stream_out_methods, - _service.adapt_event_value_in_stream_out) - adapt_unpooled_methods( - methods, event_stream_in_value_out_methods, - _service.adapt_event_stream_in_value_out) - adapt_unpooled_methods( - methods, event_stream_in_stream_out_methods, - _service.adapt_event_stream_in_stream_out) - - return methods - - -def servicer( - pool, - inline_value_in_value_out_methods=None, - inline_value_in_stream_out_methods=None, - inline_stream_in_value_out_methods=None, - inline_stream_in_stream_out_methods=None, - event_value_in_value_out_methods=None, - event_value_in_stream_out_methods=None, - event_stream_in_value_out_methods=None, - event_stream_in_stream_out_methods=None, - multi_method=None): + def unary_unary_multi_callable(self, name): + return _UnaryUnaryMultiCallable(self._front, name) + + def unary_stream_multi_callable(self, name): + return _UnaryStreamMultiCallable(self._front, name) + + def stream_unary_multi_callable(self, name): + return _StreamUnaryMultiCallable(self._front, name, self._pool) + + def stream_stream_multi_callable(self, name): + return _StreamStreamMultiCallable(self._front, name, self._pool) + + +class _DynamicStub(interfaces.DynamicStub): + """An interfaces.DynamicStub implementation.""" + + def __init__(self, cardinalities, front, pool): + self._cardinalities = cardinalities + self._front = front + self._pool = pool + + def __getattr__(self, attr): + cardinality = self._cardinalities.get(attr) + if cardinality is cardinality.Cardinality.UNARY_UNARY: + return _UnaryUnaryMultiCallable(self._front, attr) + elif cardinality is cardinality.Cardinality.UNARY_STREAM: + return _UnaryStreamMultiCallable(self._front, attr) + elif cardinality is cardinality.Cardinality.STREAM_UNARY: + return _StreamUnaryMultiCallable(self._front, attr, self._pool) + elif cardinality is cardinality.Cardinality.STREAM_STREAM: + return _StreamStreamMultiCallable(self._front, attr, self._pool) + else: + raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr) + + +def _adapt_method_implementations(method_implementations, pool): + adapted_implementations = {} + for name, method_implementation in method_implementations.iteritems(): + if method_implementation.style is style.Service.INLINE: + if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: + adapted_implementations[name] = _service.adapt_inline_value_in_value_out( + method_implementation.unary_unary_inline) + elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: + adapted_implementations[name] = _service.adapt_inline_value_in_stream_out( + method_implementation.unary_stream_inline) + elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: + adapted_implementations[name] = _service.adapt_inline_stream_in_value_out( + method_implementation.stream_unary_inline, pool) + elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: + adapted_implementations[name] = _service.adapt_inline_stream_in_stream_out( + method_implementation.stream_stream_inline, pool) + elif method_implementation.style is style.Service.EVENT: + if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: + adapted_implementations[name] = _service.adapt_event_value_in_value_out( + method_implementation.unary_unary_event) + elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: + adapted_implementations[name] = _service.adapt_event_value_in_stream_out( + method_implementation.unary_stream_event) + elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: + adapted_implementations[name] = _service.adapt_event_stream_in_value_out( + method_implementation.stream_unary_event) + elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: + adapted_implementations[name] = _service.adapt_event_stream_in_stream_out( + method_implementation.stream_stream_event) + return adapted_implementations + + +def servicer(pool, method_implementations, multi_method_implementation): """Creates a base_interfaces.Servicer. - The key sets of the passed dictionaries must be disjoint. It is guaranteed - that any passed MultiMethod implementation will only be called to service an - RPC if the RPC method name is not present in the key sets of the passed - dictionaries. + It is guaranteed that any passed interfaces.MultiMethodImplementation will + only be called to service an RPC if there is no + interfaces.MethodImplementation for the RPC method in the passed + method_implementations dictionary. Args: pool: A thread pool. - inline_value_in_value_out_methods: A dictionary mapping method names to - interfaces.InlineValueInValueOutMethod implementations. - inline_value_in_stream_out_methods: A dictionary mapping method names to - interfaces.InlineValueInStreamOutMethod implementations. - inline_stream_in_value_out_methods: A dictionary mapping method names to - interfaces.InlineStreamInValueOutMethod implementations. - inline_stream_in_stream_out_methods: A dictionary mapping method names to - interfaces.InlineStreamInStreamOutMethod implementations. - event_value_in_value_out_methods: A dictionary mapping method names to - interfaces.EventValueInValueOutMethod implementations. - event_value_in_stream_out_methods: A dictionary mapping method names to - interfaces.EventValueInStreamOutMethod implementations. - event_stream_in_value_out_methods: A dictionary mapping method names to - interfaces.EventStreamInValueOutMethod implementations. - event_stream_in_stream_out_methods: A dictionary mapping method names to - interfaces.EventStreamInStreamOutMethod implementations. - multi_method: An implementation of interfaces.MultiMethod. + method_implementations: A dictionary from RPC method name to + interfaces.MethodImplementation object to be used to service the named + RPC method. + multi_method_implementation: An interfaces.MultiMethodImplementation to be + used to service any RPCs not serviced by the + interfaces.MethodImplementations given in the method_implementations + dictionary, or None. Returns: A base_interfaces.Servicer that services RPCs via the given implementations. """ - methods = _aggregate_methods( - pool, - inline_value_in_value_out_methods, - inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods, - event_value_in_value_out_methods, - event_value_in_stream_out_methods, - event_stream_in_value_out_methods, - event_stream_in_stream_out_methods) + adapted_implementations = _adapt_method_implementations( + method_implementations, pool) + return _BaseServicer(adapted_implementations, multi_method_implementation) - return _BaseServicer(methods, multi_method) +def generic_stub(front, pool): + """Creates an interfaces.GenericStub. -def server(): - """Creates an interfaces.Server. + Args: + front: A base_interfaces.Front. + pool: A futures.ThreadPoolExecutor. Returns: - An interfaces.Server. + An interfaces.GenericStub that performs RPCs via the given + base_interfaces.Front. """ - return _Server() + return _GenericStub(front, pool) -def stub(front, pool): - """Creates an interfaces.Stub. +def dynamic_stub(cardinalities, front, pool, prefix): + """Creates an interfaces.DynamicStub. Args: + cardinalities: A dict from RPC method name to cardinality.Cardinality + value identifying the cardinality of every RPC method to be supported by + the created interfaces.DynamicStub. front: A base_interfaces.Front. pool: A futures.ThreadPoolExecutor. + prefix: A string to prepend when mapping requested attribute name to RPC + method name during attribute access on the created + interfaces.DynamicStub. Returns: - An interfaces.Stub that performs RPCs via the given base_interfaces.Front. + An interfaces.DynamicStub that performs RPCs via the given + base_interfaces.Front. """ - return _Stub(front, pool) + return _DynamicStub(cardinalities, front, pool, prefix) diff --git a/src/python/src/grpc/framework/face/interfaces.py b/src/python/src/grpc/framework/face/interfaces.py index 9e19106e6f3..b7cc4c1169a 100644 --- a/src/python/src/grpc/framework/face/interfaces.py +++ b/src/python/src/grpc/framework/face/interfaces.py @@ -32,11 +32,24 @@ import abc import enum -# exceptions, abandonment, and future are referenced from specification in this -# module. +# cardinality, style, exceptions, abandonment, future, and stream are +# referenced from specification in this module. +from grpc.framework.common import cardinality # pylint: disable=unused-import +from grpc.framework.common import style # pylint: disable=unused-import from grpc.framework.face import exceptions # pylint: disable=unused-import from grpc.framework.foundation import abandonment # pylint: disable=unused-import from grpc.framework.foundation import future # pylint: disable=unused-import +from grpc.framework.foundation import stream # pylint: disable=unused-import + + +@enum.unique +class Abortion(enum.Enum): + """Categories of RPC abortion.""" + CANCELLED = 'cancelled' + EXPIRED = 'expired' + NETWORK_FAILURE = 'network failure' + SERVICED_FAILURE = 'serviced failure' + SERVICER_FAILURE = 'servicer failure' class CancellableIterator(object): @@ -59,69 +72,61 @@ class CancellableIterator(object): raise NotImplementedError() -class UnaryUnarySyncAsync(object): - """Affords invoking a unary-unary RPC synchronously or asynchronously. - - Values implementing this interface are directly callable and present an - "async" method. Both calls take a request value and a numeric timeout. - Direct invocation of a value of this type invokes its associated RPC and - blocks until the RPC's response is available. Calling the "async" method - of a value of this type invokes its associated RPC and immediately returns a - future.Future bound to the asynchronous execution of the RPC. - """ +class RpcContext(object): + """Provides RPC-related information and action.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __call__(self, request, timeout): - """Synchronously invokes the underlying RPC. + def is_active(self): + """Describes whether the RPC is active or has terminated.""" + raise NotImplementedError() - Args: - request: The request value for the RPC. - timeout: A duration of time in seconds to allow for the RPC. + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the RPC. Returns: - The response value for the RPC. - - Raises: - exceptions.RpcError: Indicating that the RPC was aborted. + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have timed + out. """ raise NotImplementedError() @abc.abstractmethod - def async(self, request, timeout): - """Asynchronously invokes the underlying RPC. + def add_abortion_callback(self, abortion_callback): + """Registers a callback to be called if the RPC is aborted. Args: - request: The request value for the RPC. - timeout: A duration of time in seconds to allow for the RPC. - - Returns: - A future.Future representing the RPC. In the event of RPC completion, the - returned Future's result value will be the response value of the RPC. - In the event of RPC abortion, the returned Future's exception value - will be an exceptions.RpcError. + abortion_callback: A callable to be called and passed an Abortion value + in the event of RPC abortion. """ raise NotImplementedError() -class StreamUnarySyncAsync(object): - """Affords invoking a stream-unary RPC synchronously or asynchronously. +class Call(object): + """Invocation-side representation of an RPC. - Values implementing this interface are directly callable and present an - "async" method. Both calls take an iterator of request values and a numeric - timeout. Direct invocation of a value of this type invokes its associated RPC - and blocks until the RPC's response is available. Calling the "async" method - of a value of this type invokes its associated RPC and immediately returns a - future.Future bound to the asynchronous execution of the RPC. + Attributes: + context: An RpcContext affording information about the RPC. """ __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __call__(self, request_iterator, timeout): + def cancel(self): + """Requests cancellation of the RPC.""" + raise NotImplementedError() + + +class UnaryUnaryMultiCallable(object): + """Affords invoking a unary-unary RPC in any call style.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request, timeout): """Synchronously invokes the underlying RPC. Args: - request_iterator: An iterator that yields request values for the RPC. + request: The request value for the RPC. timeout: A duration of time in seconds to allow for the RPC. Returns: @@ -133,11 +138,11 @@ class StreamUnarySyncAsync(object): raise NotImplementedError() @abc.abstractmethod - def async(self, request, timeout): + def future(self, request, timeout): """Asynchronously invokes the underlying RPC. Args: - request_iterator: An iterator that yields request values for the RPC. + request: The request value for the RPC. timeout: A duration of time in seconds to allow for the RPC. Returns: @@ -148,248 +153,204 @@ class StreamUnarySyncAsync(object): """ raise NotImplementedError() - -@enum.unique -class Abortion(enum.Enum): - """Categories of RPC abortion.""" - - CANCELLED = 'cancelled' - EXPIRED = 'expired' - NETWORK_FAILURE = 'network failure' - SERVICED_FAILURE = 'serviced failure' - SERVICER_FAILURE = 'servicer failure' - - -class RpcContext(object): - """Provides RPC-related information and action.""" - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def is_active(self): - """Describes whether the RPC is active or has terminated.""" - raise NotImplementedError() - - @abc.abstractmethod - def time_remaining(self): - """Describes the length of allowed time remaining for the RPC. - - Returns: - A nonnegative float indicating the length of allowed time in seconds - remaining for the RPC to complete before it is considered to have timed - out. - """ - raise NotImplementedError() - @abc.abstractmethod - def add_abortion_callback(self, abortion_callback): - """Registers a callback to be called if the RPC is aborted. + def event(self, request, response_callback, abortion_callback, timeout): + """Asynchronously invokes the underlying RPC. Args: - abortion_callback: A callable to be called and passed an Abortion value + request: The request value for the RPC. + response_callback: A callback to be called to accept the restponse value + of the RPC. + abortion_callback: A callback to be called and passed an Abortion value in the event of RPC abortion. - """ - raise NotImplementedError() - - -class InlineValueInValueOutMethod(object): - """A type for inline unary-request-unary-response RPC methods.""" - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def service(self, request, context): - """Services an RPC that accepts one value and produces one value. - - Args: - request: The single request value for the RPC. - context: An RpcContext object. + timeout: A duration of time in seconds to allow for the RPC. Returns: - The single response value for the RPC. - - Raises: - abandonment.Abandoned: If no response is necessary because the RPC has - been aborted. + A Call object for the RPC. """ raise NotImplementedError() -class InlineValueInStreamOutMethod(object): - """A type for inline unary-request-stream-response RPC methods.""" +class UnaryStreamMultiCallable(object): + """Affords invoking a unary-stream RPC in any call style.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def service(self, request, context): - """Services an RPC that accepts one value and produces a stream of values. + def __call__(self, request, timeout): + """Synchronously invokes the underlying RPC. Args: - request: The single request value for the RPC. - context: An RpcContext object. - - Yields: - The values that comprise the response stream of the RPC. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. - Raises: - abandonment.Abandoned: If completing the response stream is not necessary - because the RPC has been aborted. + Returns: + A CancellableIterator that yields the response values of the RPC and + affords RPC cancellation. Drawing response values from the returned + CancellableIterator may raise exceptions.RpcError indicating abortion + of the RPC. """ raise NotImplementedError() - -class InlineStreamInValueOutMethod(object): - """A type for inline stream-request-unary-response RPC methods.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod - def service(self, request_iterator, context): - """Services an RPC that accepts a stream of values and produces one value. + def event(self, request, response_consumer, abortion_callback, timeout): + """Asynchronously invokes the underlying RPC. Args: - request_iterator: An iterator that yields the request values of the RPC. - Drawing values from this iterator may also raise exceptions.RpcError to - indicate abortion of the RPC. - context: An RpcContext object. - - Yields: - The values that comprise the response stream of the RPC. + request: The request value for the RPC. + response_consumer: A stream.Consumer to be called to accept the restponse + values of the RPC. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. + timeout: A duration of time in seconds to allow for the RPC. - Raises: - abandonment.Abandoned: If no response is necessary because the RPC has - been aborted. - exceptions.RpcError: Implementations of this method must not deliberately - raise exceptions.RpcError but may allow such errors raised by the - request_iterator passed to them to propagate through their bodies - uncaught. + Returns: + A Call object for the RPC. """ raise NotImplementedError() -class InlineStreamInStreamOutMethod(object): - """A type for inline stream-request-stream-response RPC methods.""" +class StreamUnaryMultiCallable(object): + """Affords invoking a stream-unary RPC in any call style.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def service(self, request_iterator, context): - """Services an RPC that accepts and produces streams of values. + def __call__(self, request_iterator, timeout): + """Synchronously invokes the underlying RPC. Args: - request_iterator: An iterator that yields the request values of the RPC. - Drawing values from this iterator may also raise exceptions.RpcError to - indicate abortion of the RPC. - context: An RpcContext object. + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. - Yields: - The values that comprise the response stream of the RPC. + Returns: + The response value for the RPC. Raises: - abandonment.Abandoned: If completing the response stream is not necessary - because the RPC has been aborted. - exceptions.RpcError: Implementations of this method must not deliberately - raise exceptions.RpcError but may allow such errors raised by the - request_iterator passed to them to propagate through their bodies - uncaught. + exceptions.RpcError: Indicating that the RPC was aborted. """ raise NotImplementedError() - -class EventValueInValueOutMethod(object): - """A type for event-driven unary-request-unary-response RPC methods.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod - def service(self, request, response_callback, context): - """Services an RPC that accepts one value and produces one value. + def future(self, request_iterator, timeout): + """Asynchronously invokes the underlying RPC. Args: - request: The single request value for the RPC. - response_callback: A callback to be called to accept the response value of - the RPC. - context: An RpcContext object. + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. """ raise NotImplementedError() - -class EventValueInStreamOutMethod(object): - """A type for event-driven unary-request-stream-response RPC methods.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod - def service(self, request, response_consumer, context): - """Services an RPC that accepts one value and produces a stream of values. + def event(self, response_callback, abortion_callback, timeout): + """Asynchronously invokes the underlying RPC. Args: - request: The single request value for the RPC. - response_consumer: A stream.Consumer to be called to accept the response - values of the RPC. - context: An RpcContext object. + request: The request value for the RPC. + response_callback: A callback to be called to accept the restponse value + of the RPC. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. + timeout: A duration of time in seconds to allow for the RPC. - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. + Returns: + A pair of a Call object for the RPC and a stream.Consumer to which the + request values of the RPC should be passed. """ raise NotImplementedError() -class EventStreamInValueOutMethod(object): - """A type for event-driven stream-request-unary-response RPC methods.""" +class StreamStreamMultiCallable(object): + """Affords invoking a stream-stream RPC in any call style.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def service(self, response_callback, context): - """Services an RPC that accepts a stream of values and produces one value. + def __call__(self, request_iterator, timeout): + """Synchronously invokes the underlying RPC. Args: - response_callback: A callback to be called to accept the response value of - the RPC. - context: An RpcContext object. + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. Returns: - A stream.Consumer with which to accept the request values of the RPC. The - consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing values to this object. Implementations must not assume that this - object will be called to completion of the request stream or even called - at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. + A CancellableIterator that yields the response values of the RPC and + affords RPC cancellation. Drawing response values from the returned + CancellableIterator may raise exceptions.RpcError indicating abortion + of the RPC. """ raise NotImplementedError() - -class EventStreamInStreamOutMethod(object): - """A type for event-driven stream-request-stream-response RPC methods.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod - def service(self, response_consumer, context): - """Services an RPC that accepts and produces streams of values. + def event(self, response_consumer, abortion_callback, timeout): + """Asynchronously invokes the underlying RPC. - Args: - response_consumer: A stream.Consumer to be called to accept the response +l Args: + response_consumer: A stream.Consumer to be called to accept the restponse values of the RPC. - context: An RpcContext object. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. + timeout: A duration of time in seconds to allow for the RPC. Returns: - A stream.Consumer with which to accept the request values of the RPC. The - consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing values to this object. Implementations must not assume that this - object will be called to completion of the request stream or even called - at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. + A pair of a Call object for the RPC and a stream.Consumer to which the + request values of the RPC should be passed. """ raise NotImplementedError() -class MultiMethod(object): +class MethodImplementation(object): + """A sum type that describes an RPC method implementation. + + Attributes: + cardinality: A cardinality.Cardinality value. + style: A style.Service value. + unary_unary_inline: The implementation of the RPC method as a callable + value that takes a request value and an RpcContext object and returns a + response value. Only non-None if cardinality is + cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE. + unary_stream_inline: The implementation of the RPC method as a callable + value that takes a request value and an RpcContext object and returns an + iterator of response values. Only non-None if cardinality is + cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE. + stream_unary_inline: The implementation of the RPC method as a callable + value that takes an iterator of request values and an RpcContext object + and returns a response value. Only non-None if cardinality is + cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE. + stream_stream_inline: The implementation of the RPC method as a callable + value that takes an iterator of request values and an RpcContext object + and returns an iterator of response values. Only non-None if cardinality + is cardinality.Cardinality.STREAM_STREAM and style is + style.Service.INLINE. + unary_unary_event: The implementation of the RPC method as a callable value + that takes a request value, a response callback to which to pass the + response value of the RPC, and an RpcContext. Only non-None if + cardinality is cardinality.Cardinality.UNARY_UNARY and style is + style.Service.EVENT. + unary_stream_event: The implementation of the RPC method as a callable + value that takes a request value, a stream.Consumer to which to pass the + the response values of the RPC, and an RpcContext. Only non-None if + cardinality is cardinality.Cardinality.UNARY_STREAM and style is + style.Service.EVENT. + stream_unary_event: The implementation of the RPC method as a callable + value that takes a response callback to which to pass the response value + of the RPC and an RpcContext and returns a stream.Consumer to which the + request values of the RPC should be passed. Only non-None if cardinality + is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT. + stream_stream_event: The implementation of the RPC method as a callable + value that takes a stream.Consumer to which to pass the response values + of the RPC and an RpcContext and returns a stream.Consumer to which the + request values of the RPC should be passed. Only non-None if cardinality + is cardinality.Cardinality.STREAM_STREAM and style is + style.Service.EVENT. + """ + __metaclass__ = abc.ABCMeta + + +class MultiMethodImplementation(object): """A general type able to service many RPC methods.""" __metaclass__ = abc.ABCMeta @@ -420,26 +381,7 @@ class MultiMethod(object): raise NotImplementedError() -class Server(object): - """Specification of a running server that services RPCs.""" - __metaclass__ = abc.ABCMeta - - -class Call(object): - """Invocation-side representation of an RPC. - - Attributes: - context: An RpcContext affording information about the RPC. - """ - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def cancel(self): - """Requests cancellation of the RPC.""" - raise NotImplementedError() - - -class Stub(object): +class GenericStub(object): """Affords RPC methods to callers.""" __metaclass__ = abc.ABCMeta @@ -632,25 +574,67 @@ class Stub(object): raise NotImplementedError() @abc.abstractmethod - def unary_unary_sync_async(self, name): - """Creates a UnaryUnarySyncAsync value for a unary-unary RPC method. + def unary_unary_multi_callable(self, name): + """Creates a UnaryUnaryMultiCallable for a unary-unary RPC method. + + Args: + name: The RPC method name. + + Returns: + A UnaryUnaryMultiCallable value for the named unary-unary RPC method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream_multi_callable(self, name): + """Creates a UnaryStreamMultiCallable for a unary-stream RPC method. Args: name: The RPC method name. Returns: - A UnaryUnarySyncAsync value for the named unary-unary RPC method. + A UnaryStreamMultiCallable value for the name unary-stream RPC method. """ raise NotImplementedError() @abc.abstractmethod - def stream_unary_sync_async(self, name): - """Creates a StreamUnarySyncAsync value for a stream-unary RPC method. + def stream_unary_multi_callable(self, name): + """Creates a StreamUnaryMultiCallable for a stream-unary RPC method. Args: name: The RPC method name. Returns: - A StreamUnarySyncAsync value for the named stream-unary RPC method. + A StreamUnaryMultiCallable value for the named stream-unary RPC method. """ raise NotImplementedError() + + @abc.abstractmethod + def stream_stream_multi_callable(self, name): + """Creates a StreamStreamMultiCallable for a stream-stream RPC method. + + Args: + name: The RPC method name. + + Returns: + A StreamStreamMultiCallable value for the named stream-stream RPC method. + """ + raise NotImplementedError() + + +class DynamicStub(object): + """A stub with RPC-method-bound multi-callable attributes. + + Instances of this type responsd to attribute access as follows: if the + requested attribute is the name of a unary-unary RPC method, the value of the + attribute will be a UnaryUnaryMultiCallable with which to invoke the RPC + method; if the requested attribute is the name of a unary-stream RPC method, + the value of the attribute will be a UnaryStreamMultiCallable with which to + invoke the RPC method; if the requested attribute is the name of a + stream-unary RPC method, the value of the attribute will be a + StreamUnaryMultiCallable with which to invoke the RPC method; and if the + requested attribute is the name of a stream-stream RPC method, the value of + the attribute will be a StreamStreamMultiCallable with which to invoke the + RPC method. + """ + __metaclass__ = abc.ABCMeta diff --git a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py index 233486f2110..e57ee001045 100644 --- a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -61,13 +61,9 @@ class BlockingInvocationInlineServiceTestCase( self.digest = digest.digest( stock_service.STOCK_TEST_SERVICE, self.control, None) - self.server, self.stub, self.memo = self.set_up_implementation( + self.stub, self.memo = self.set_up_implementation( self.digest.name, self.digest.methods, - self.digest.inline_unary_unary_methods, - self.digest.inline_unary_stream_methods, - self.digest.inline_stream_unary_methods, - self.digest.inline_stream_stream_methods, - {}, {}, {}, {}, None) + self.digest.inline_method_implementations, None) def tearDown(self): """See unittest.TestCase.tearDown for full specification. @@ -147,8 +143,8 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): - sync_async = self.stub.unary_unary_sync_async(name) - sync_async(request, _TIMEOUT) + multi_callable = self.stub.unary_unary_multi_callable(name) + multi_callable(request, _TIMEOUT) def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -170,8 +166,8 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): - sync_async = self.stub.stream_unary_sync_async(name) - sync_async(iter(requests), _TIMEOUT) + multi_callable = self.stub.stream_unary_multi_callable(name) + multi_callable(iter(requests), _TIMEOUT) def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( diff --git a/src/python/src/grpc/framework/face/testing/digest.py b/src/python/src/grpc/framework/face/testing/digest.py index b8fb5733013..db8fcbb0184 100644 --- a/src/python/src/grpc/framework/face/testing/digest.py +++ b/src/python/src/grpc/framework/face/testing/digest.py @@ -34,6 +34,8 @@ import threading # testing_control, interfaces, and testing_service are referenced from # specification in this module. +from grpc.framework.common import cardinality +from grpc.framework.common import style from grpc.framework.face import exceptions from grpc.framework.face import interfaces as face_interfaces from grpc.framework.face.testing import control as testing_control # pylint: disable=unused-import @@ -50,15 +52,9 @@ class TestServiceDigest( 'TestServiceDigest', ['name', 'methods', - 'inline_unary_unary_methods', - 'inline_unary_stream_methods', - 'inline_stream_unary_methods', - 'inline_stream_stream_methods', - 'event_unary_unary_methods', - 'event_unary_stream_methods', - 'event_stream_unary_methods', - 'event_stream_stream_methods', - 'multi_method', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', 'unary_unary_messages_sequences', 'unary_stream_messages_sequences', 'stream_unary_messages_sequences', @@ -69,32 +65,14 @@ class TestServiceDigest( name: The RPC service name to be used in the test. methods: A sequence of interfaces.Method objects describing the RPC methods that will be called during the test. - inline_unary_unary_methods: A dict from method name to - face_interfaces.InlineValueInValueOutMethod object to be used in tests of + inline_method_implementations: A dict from RPC method name to + face_interfaces.MethodImplementation object to be used in tests of in-line calls to behaviors under test. - inline_unary_stream_methods: A dict from method name to - face_interfaces.InlineValueInStreamOutMethod object to be used in tests of - in-line calls to behaviors under test. - inline_stream_unary_methods: A dict from method name to - face_interfaces.InlineStreamInValueOutMethod object to be used in tests of - in-line calls to behaviors under test. - inline_stream_stream_methods: A dict from method name to - face_interfaces.InlineStreamInStreamOutMethod object to be used in tests - of in-line calls to behaviors under test. - event_unary_unary_methods: A dict from method name to - face_interfaces.EventValueInValueOutMethod object to be used in tests of - event-driven calls to behaviors under test. - event_unary_stream_methods: A dict from method name to - face_interfaces.EventValueInStreamOutMethod object to be used in tests of - event-driven calls to behaviors under test. - event_stream_unary_methods: A dict from method name to - face_interfaces.EventStreamInValueOutMethod object to be used in tests of + event_method_implementations: A dict from RPC method name to + face_interfaces.MethodImplementation object to be used in tests of event-driven calls to behaviors under test. - event_stream_stream_methods: A dict from method name to - face_interfaces.EventStreamInStreamOutMethod object to be used in tests of - event-driven calls to behaviors under test. - multi_method: A face_interfaces.MultiMethod to be used in tests of generic - calls to behaviors under test. + multi_method_implementation: A face_interfaces.MultiMethodImplementation to + be used in tests of generic calls to behaviors under test. unary_unary_messages_sequences: A dict from method name to sequence of service.UnaryUnaryTestMessages objects to be used to test the method with the given name. @@ -130,27 +108,33 @@ class _BufferingConsumer(stream.Consumer): self.terminated = True -class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod): +class _InlineUnaryUnaryMethod(face_interfaces.MethodImplementation): def __init__(self, unary_unary_test_method, control): self._test_method = unary_unary_test_method self._control = control - def service(self, request, context): + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.INLINE + + def unary_unary_inline(self, request, context): response_list = [] self._test_method.service( request, response_list.append, context, self._control) return response_list.pop(0) -class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod): +class _EventUnaryUnaryMethod(face_interfaces.MethodImplementation): def __init__(self, unary_unary_test_method, control, pool): self._test_method = unary_unary_test_method self._control = control self._pool = pool - def service(self, request, response_callback, context): + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.EVENT + + def unary_unary_event(self, request, response_callback, context): if self._pool is None: self._test_method.service( request, response_callback, context, self._control) @@ -160,13 +144,16 @@ class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod): self._control) -class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): +class _InlineUnaryStreamMethod(face_interfaces.MethodImplementation): def __init__(self, unary_stream_test_method, control): self._test_method = unary_stream_test_method self._control = control - def service(self, request, context): + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.INLINE + + def unary_stream_inline(self, request, context): response_consumer = _BufferingConsumer() self._test_method.service( request, response_consumer, context, self._control) @@ -174,14 +161,17 @@ class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): yield response -class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod): +class _EventUnaryStreamMethod(face_interfaces.MethodImplementation): def __init__(self, unary_stream_test_method, control, pool): self._test_method = unary_stream_test_method self._control = control self._pool = pool - def service(self, request, response_consumer, context): + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.EVENT + + def unary_stream_event(self, request, response_consumer, context): if self._pool is None: self._test_method.service( request, response_consumer, context, self._control) @@ -191,13 +181,16 @@ class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod): self._control) -class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): +class _InlineStreamUnaryMethod(face_interfaces.MethodImplementation): def __init__(self, stream_unary_test_method, control): self._test_method = stream_unary_test_method self._control = control - def service(self, request_iterator, context): + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.INLINE + + def stream_unary_inline(self, request_iterator, context): response_list = [] request_consumer = self._test_method.service( response_list.append, context, self._control) @@ -207,14 +200,17 @@ class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): return response_list.pop(0) -class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod): +class _EventStreamUnaryMethod(face_interfaces.MethodImplementation): def __init__(self, stream_unary_test_method, control, pool): self._test_method = stream_unary_test_method self._control = control self._pool = pool - def service(self, response_callback, context): + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.EVENT + + def stream_unary_event(self, response_callback, context): request_consumer = self._test_method.service( response_callback, context, self._control) if self._pool is None: @@ -223,13 +219,16 @@ class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod): return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) -class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): +class _InlineStreamStreamMethod(face_interfaces.MethodImplementation): def __init__(self, stream_stream_test_method, control): self._test_method = stream_stream_test_method self._control = control - def service(self, request_iterator, context): + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.INLINE + + def stream_stream_inline(self, request_iterator, context): response_consumer = _BufferingConsumer() request_consumer = self._test_method.service( response_consumer, context, self._control) @@ -241,14 +240,17 @@ class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): response_consumer.terminate() -class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod): +class _EventStreamStreamMethod(face_interfaces.MethodImplementation): def __init__(self, stream_stream_test_method, control, pool): self._test_method = stream_stream_test_method self._control = control self._pool = pool - def service(self, response_consumer, context): + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.EVENT + + def stream_stream_event(self, response_consumer, context): request_consumer = self._test_method.service( response_consumer, context, self._control) if self._pool is None: @@ -332,7 +334,7 @@ class _StreamUnaryAdaptation(object): response_consumer.consume_and_terminate, context, control) -class _MultiMethod(face_interfaces.MultiMethod): +class _MultiMethodImplementation(face_interfaces.MultiMethodImplementation): def __init__(self, methods, control, pool): self._methods = methods @@ -427,19 +429,21 @@ def digest(service, control, pool): adaptations.update(unary_stream.adaptations) adaptations.update(stream_unary.adaptations) adaptations.update(stream_stream.adaptations) + inlines = dict(unary_unary.inlines) + inlines.update(unary_stream.inlines) + inlines.update(stream_unary.inlines) + inlines.update(stream_stream.inlines) + events = dict(unary_unary.events) + events.update(unary_stream.events) + events.update(stream_unary.events) + events.update(stream_stream.events) return TestServiceDigest( service.name(), methods, - unary_unary.inlines, - unary_stream.inlines, - stream_unary.inlines, - stream_stream.inlines, - unary_unary.events, - unary_stream.events, - stream_unary.events, - stream_stream.events, - _MultiMethod(adaptations, control, pool), + inlines, + events, + _MultiMethodImplementation(adaptations, control, pool), unary_unary.messages, unary_stream.messages, stream_unary.messages, diff --git a/src/python/src/grpc/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/event_invocation_synchronous_event_service_test_case.py index 21e669b9080..0f0b0e3d523 100644 --- a/src/python/src/grpc/framework/face/testing/event_invocation_synchronous_event_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -60,14 +60,9 @@ class EventInvocationSynchronousEventServiceTestCase( self.digest = digest.digest( stock_service.STOCK_TEST_SERVICE, self.control, None) - self.server, self.stub, self.memo = self.set_up_implementation( + self.stub, self.memo = self.set_up_implementation( self.digest.name, self.digest.methods, - {}, {}, {}, {}, - self.digest.event_unary_unary_methods, - self.digest.event_unary_stream_methods, - self.digest.event_stream_unary_methods, - self.digest.event_stream_stream_methods, - None) + self.digest.event_method_implementations, None) def tearDown(self): """See unittest.TestCase.tearDown for full specification. diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index c87846f2ef6..0d51b64f1b3 100644 --- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -91,14 +91,9 @@ class FutureInvocationAsynchronousEventServiceTestCase( self.digest = digest.digest( stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool) - self.server, self.stub, self.memo = self.set_up_implementation( + self.stub, self.memo = self.set_up_implementation( self.digest.name, self.digest.methods, - {}, {}, {}, {}, - self.digest.event_unary_unary_methods, - self.digest.event_unary_stream_methods, - self.digest.event_stream_unary_methods, - self.digest.event_stream_stream_methods, - None) + self.digest.event_method_implementations, None) def tearDown(self): """See unittest.TestCase.tearDown for full specification. @@ -190,8 +185,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( request = test_messages.request() with self.control.pause(): - sync_async = self.stub.unary_unary_sync_async(name) - response_future = sync_async.async(request, _TIMEOUT) + multi_callable = self.stub.unary_unary_multi_callable(name) + response_future = multi_callable.future(request, _TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): @@ -216,8 +211,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( requests = test_messages.requests() with self.control.pause(): - sync_async = self.stub.stream_unary_sync_async(name) - response_future = sync_async.async(iter(requests), _TIMEOUT) + multi_callable = self.stub.stream_unary_multi_callable(name) + response_future = multi_callable.future(iter(requests), _TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): diff --git a/src/python/src/grpc/framework/face/testing/service.py b/src/python/src/grpc/framework/face/testing/service.py index a58e2ee42e4..bf54d41d664 100644 --- a/src/python/src/grpc/framework/face/testing/service.py +++ b/src/python/src/grpc/framework/face/testing/service.py @@ -36,8 +36,8 @@ from grpc.framework.face import interfaces as face_interfaces # pylint: disable from grpc.framework.face.testing import interfaces -class UnaryUnaryTestMethod(interfaces.Method): - """Like face_interfaces.EventValueInValueOutMethod but with a control.""" +class UnaryUnaryTestMethodImplementation(interfaces.Method): + """A controllable implementation of a unary-unary RPC method.""" __metaclass__ = abc.ABCMeta @@ -93,8 +93,8 @@ class UnaryUnaryTestMessages(object): raise NotImplementedError() -class UnaryStreamTestMethod(interfaces.Method): - """Like face_interfaces.EventValueInStreamOutMethod but with a control.""" +class UnaryStreamTestMethodImplementation(interfaces.Method): + """A controllable implementation of a unary-stream RPC method.""" __metaclass__ = abc.ABCMeta @@ -106,7 +106,7 @@ class UnaryStreamTestMethod(interfaces.Method): request: The single request message for the RPC. response_consumer: A stream.Consumer to be called to accept the response messages of the RPC. - context: An RpcContext object. + context: A face_interfaces.RpcContext object. control: A test_control.Control to control execution of this method. Raises: @@ -150,8 +150,8 @@ class UnaryStreamTestMessages(object): raise NotImplementedError() -class StreamUnaryTestMethod(interfaces.Method): - """Like face_interfaces.EventStreamInValueOutMethod but with a control.""" +class StreamUnaryTestMethodImplementation(interfaces.Method): + """A controllable implementation of a stream-unary RPC method.""" __metaclass__ = abc.ABCMeta @@ -162,7 +162,7 @@ class StreamUnaryTestMethod(interfaces.Method): Args: response_callback: A callback to be called to accept the response message of the RPC. - context: An RpcContext object. + context: A face_interfaces.RpcContext object. control: A test_control.Control to control execution of this method. Returns: @@ -214,8 +214,8 @@ class StreamUnaryTestMessages(object): raise NotImplementedError() -class StreamStreamTestMethod(interfaces.Method): - """Like face_interfaces.EventStreamInStreamOutMethod but with a control.""" +class StreamStreamTestMethodImplementation(interfaces.Method): + """A controllable implementation of a stream-stream RPC method.""" __metaclass__ = abc.ABCMeta @@ -226,7 +226,7 @@ class StreamStreamTestMethod(interfaces.Method): Args: response_consumer: A stream.Consumer to be called to accept the response messages of the RPC. - context: An RpcContext object. + context: A face_interfaces.RpcContext object. control: A test_control.Control to control execution of this method. Returns: @@ -298,8 +298,8 @@ class TestService(object): Returns: A dict from method name to pair. The first element of the pair - is a UnaryUnaryTestMethod object and the second element is a sequence - of UnaryUnaryTestMethodMessages objects. + is a UnaryUnaryTestMethodImplementation object and the second element + is a sequence of UnaryUnaryTestMethodMessages objects. """ raise NotImplementedError() @@ -309,8 +309,8 @@ class TestService(object): Returns: A dict from method name to pair. The first element of the pair is a - UnaryStreamTestMethod object and the second element is a sequence of - UnaryStreamTestMethodMessages objects. + UnaryStreamTestMethodImplementation object and the second element is a + sequence of UnaryStreamTestMethodMessages objects. """ raise NotImplementedError() @@ -320,8 +320,8 @@ class TestService(object): Returns: A dict from method name to pair. The first element of the pair is a - StreamUnaryTestMethod object and the second element is a sequence of - StreamUnaryTestMethodMessages objects. + StreamUnaryTestMethodImplementation object and the second element is a + sequence of StreamUnaryTestMethodMessages objects. """ raise NotImplementedError() @@ -331,7 +331,7 @@ class TestService(object): Returns: A dict from method name to pair. The first element of the pair is a - StreamStreamTestMethod object and the second element is a sequence of - StreamStreamTestMethodMessages objects. + StreamStreamTestMethodImplementation object and the second element is a + sequence of StreamStreamTestMethodMessages objects. """ raise NotImplementedError() diff --git a/src/python/src/grpc/framework/face/testing/stock_service.py b/src/python/src/grpc/framework/face/testing/stock_service.py index 83c9418b074..61aaf444a00 100644 --- a/src/python/src/grpc/framework/face/testing/stock_service.py +++ b/src/python/src/grpc/framework/face/testing/stock_service.py @@ -139,7 +139,7 @@ def _get_highest_trade_price(stock_reply_callback, control, active): return StockRequestConsumer() -class GetLastTradePrice(service.UnaryUnaryTestMethod): +class GetLastTradePrice(service.UnaryUnaryTestMethodImplementation): """GetLastTradePrice for use in tests.""" def name(self): @@ -186,7 +186,7 @@ class GetLastTradePriceMessages(service.UnaryUnaryTestMessages): test_case.assertEqual(_price(request.symbol), response.price) -class GetLastTradePriceMultiple(service.StreamStreamTestMethod): +class GetLastTradePriceMultiple(service.StreamStreamTestMethodImplementation): """GetLastTradePriceMultiple for use in tests.""" def name(self): @@ -238,7 +238,7 @@ class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages): test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) -class WatchFutureTrades(service.UnaryStreamTestMethod): +class WatchFutureTrades(service.UnaryStreamTestMethodImplementation): """WatchFutureTrades for use in tests.""" def name(self): @@ -288,7 +288,7 @@ class WatchFutureTradesMessages(service.UnaryStreamTestMessages): test_case.assertEqual(base_price + index, response.price) -class GetHighestTradePrice(service.StreamUnaryTestMethod): +class GetHighestTradePrice(service.StreamUnaryTestMethodImplementation): """GetHighestTradePrice for use in tests.""" def name(self): diff --git a/src/python/src/grpc/framework/face/testing/test_case.py b/src/python/src/grpc/framework/face/testing/test_case.py index 218a2a8549b..e60e3d1d405 100644 --- a/src/python/src/grpc/framework/face/testing/test_case.py +++ b/src/python/src/grpc/framework/face/testing/test_case.py @@ -46,55 +46,24 @@ class FaceTestCase(object): @abc.abstractmethod def set_up_implementation( - self, - name, - methods, - inline_value_in_value_out_methods, - inline_value_in_stream_out_methods, - inline_stream_in_value_out_methods, - inline_stream_in_stream_out_methods, - event_value_in_value_out_methods, - event_value_in_stream_out_methods, - event_stream_in_value_out_methods, - event_stream_in_stream_out_methods, - multi_method): + self, name, methods, method_implementations, + multi_method_implementation): """Instantiates the Face Layer implementation under test. Args: name: The service name to be used in the test. methods: A sequence of interfaces.Method objects describing the RPC methods that will be called during the test. - inline_value_in_value_out_methods: A dictionary from string method names - to face_interfaces.InlineValueInValueOutMethod implementations of those - methods. - inline_value_in_stream_out_methods: A dictionary from string method names - to face_interfaces.InlineValueInStreamOutMethod implementations of those - methods. - inline_stream_in_value_out_methods: A dictionary from string method names - to face_interfaces.InlineStreamInValueOutMethod implementations of those - methods. - inline_stream_in_stream_out_methods: A dictionary from string method names - to face_interfaces.InlineStreamInStreamOutMethod implementations of - those methods. - event_value_in_value_out_methods: A dictionary from string method names - to face_interfaces.EventValueInValueOutMethod implementations of those - methods. - event_value_in_stream_out_methods: A dictionary from string method names - to face_interfaces.EventValueInStreamOutMethod implementations of those - methods. - event_stream_in_value_out_methods: A dictionary from string method names - to face_interfaces.EventStreamInValueOutMethod implementations of those - methods. - event_stream_in_stream_out_methods: A dictionary from string method names - to face_interfaces.EventStreamInStreamOutMethod implementations of those - methods. - multi_method: An face_interfaces.MultiMethod, or None. + method_implementations: A dictionary from string RPC method name to + face_interfaces.MethodImplementation object specifying + implementation of an RPC method. + multi_method_implementation: An face_interfaces.MultiMethodImplementation + or None. Returns: - A sequence of length three the first element of which is a - face_interfaces.Server, the second element of which is a - face_interfaces.Stub, (both of which are backed by the given method - implementations), and the third element of which is an arbitrary memo + A sequence of length two the first element of which is a + face_interfaces.GenericStub (backed by the given method + implementations), and the second element of which is an arbitrary memo object to be kept and passed to tearDownImplementation at the conclusion of the test. """ @@ -105,7 +74,7 @@ class FaceTestCase(object): """Destroys the Face layer implementation under test. Args: - memo: The object from the third position of the return value of + memo: The object from the second position of the return value of set_up_implementation. """ raise NotImplementedError() diff --git a/src/python/src/grpc/framework/face/utilities.py b/src/python/src/grpc/framework/face/utilities.py index 5e34be37da7..a63fe8c60df 100644 --- a/src/python/src/grpc/framework/face/utilities.py +++ b/src/python/src/grpc/framework/face/utilities.py @@ -27,101 +27,44 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Utilities for the face layer of RPC Framework.""" +"""Utilities for RPC framework's face layer.""" -# stream is referenced from specification in this module. -from grpc.framework.face import interfaces -from grpc.framework.foundation import stream # pylint: disable=unused-import - - -class _InlineUnaryUnaryMethod(interfaces.InlineValueInValueOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, request, context): - return self._behavior(request, context) - - -class _InlineUnaryStreamMethod(interfaces.InlineValueInStreamOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, request, context): - return self._behavior(request, context) - - -class _InlineStreamUnaryMethod(interfaces.InlineStreamInValueOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, request_iterator, context): - return self._behavior(request_iterator, context) - - -class _InlineStreamStreamMethod(interfaces.InlineStreamInStreamOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, request_iterator, context): - return self._behavior(request_iterator, context) +import collections +from grpc.framework.common import cardinality +from grpc.framework.common import style +from grpc.framework.face import interfaces +from grpc.framework.foundation import stream -class _EventUnaryUnaryMethod(interfaces.EventValueInValueOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, request, response_callback, context): - return self._behavior(request, response_callback, context) - - -class _EventUnaryStreamMethod(interfaces.EventValueInStreamOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, request, response_consumer, context): - return self._behavior(request, response_consumer, context) - - -class _EventStreamUnaryMethod(interfaces.EventStreamInValueOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - - def service(self, response_callback, context): - return self._behavior(response_callback, context) - - -class _EventStreamStreamMethod(interfaces.EventStreamInStreamOutMethod): - - def __init__(self, behavior): - self._behavior = behavior - def service(self, response_consumer, context): - return self._behavior(response_consumer, context) +class _MethodImplementation( + interfaces.MethodImplementation, + collections.namedtuple( + '_MethodImplementation', + ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline', + 'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event', + 'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])): + pass -def inline_unary_unary_method(behavior): - """Creates an interfaces.InlineValueInValueOutMethod from a behavior. +def unary_unary_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: - behavior: The implementation of a unary-unary RPC method as a callable - value that takes a request value and an interfaces.RpcContext object and + behavior: The implementation of a unary-unary RPC method as a callable value + that takes a request value and an interfaces.RpcContext object and returns a response value. Returns: - An interfaces.InlineValueInValueOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _InlineUnaryUnaryMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior, + None, None, None, None, None, None, None) -def inline_unary_stream_method(behavior): - """Creates an interfaces.InlineValueInStreamOutMethod from a behavior. +def unary_stream_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a unary-stream RPC method as a callable @@ -129,13 +72,15 @@ def inline_unary_stream_method(behavior): returns an iterator of response values. Returns: - An interfaces.InlineValueInStreamOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _InlineUnaryStreamMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None, + behavior, None, None, None, None, None, None) -def inline_stream_unary_method(behavior): - """Creates an interfaces.InlineStreamInValueOutMethod from a behavior. +def stream_unary_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a stream-unary RPC method as a callable @@ -143,13 +88,15 @@ def inline_stream_unary_method(behavior): interfaces.RpcContext object and returns a response value. Returns: - An interfaces.InlineStreamInValueOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _InlineStreamUnaryMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None, + behavior, None, None, None, None, None) -def inline_stream_stream_method(behavior): - """Creates an interfaces.InlineStreamInStreamOutMethod from a behavior. +def stream_stream_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a stream-stream RPC method as a callable @@ -157,14 +104,15 @@ def inline_stream_stream_method(behavior): interfaces.RpcContext object and returns an iterator of response values. Returns: - An interfaces.InlineStreamInStreamOutMethod derived from the given - behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _InlineStreamStreamMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None, + None, behavior, None, None, None, None) -def event_unary_unary_method(behavior): - """Creates an interfaces.EventValueInValueOutMethod from a behavior. +def unary_unary_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a unary-unary RPC method as a callable @@ -172,27 +120,31 @@ def event_unary_unary_method(behavior): the response value of the RPC, and an interfaces.RpcContext. Returns: - An interfaces.EventValueInValueOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _EventUnaryUnaryMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None, + None, None, behavior, None, None, None) -def event_unary_stream_method(behavior): - """Creates an interfaces.EventValueInStreamOutMethod from a behavior. +def unary_stream_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a unary-stream RPC method as a callable value that takes a request value, a stream.Consumer to which to pass the - response values of the RPC, and an interfaces.RpcContext. + the response values of the RPC, and an interfaces.RpcContext. Returns: - An interfaces.EventValueInStreamOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _EventUnaryStreamMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None, + None, None, None, behavior, None, None) -def event_stream_unary_method(behavior): - """Creates an interfaces.EventStreamInValueOutMethod from a behavior. +def stream_unary_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a stream-unary RPC method as a callable @@ -201,13 +153,15 @@ def event_stream_unary_method(behavior): which the request values of the RPC should be passed. Returns: - An interfaces.EventStreamInValueOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _EventStreamUnaryMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None, + None, None, None, None, behavior, None) -def event_stream_stream_method(behavior): - """Creates an interfaces.EventStreamInStreamOutMethod from a behavior. +def stream_stream_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. Args: behavior: The implementation of a stream-stream RPC method as a callable @@ -216,6 +170,8 @@ def event_stream_stream_method(behavior): which the request values of the RPC should be passed. Returns: - An interfaces.EventStreamInStreamOutMethod derived from the given behavior. + An interfaces.MethodImplementation derived from the given behavior. """ - return _EventStreamStreamMethod(behavior) + return _MethodImplementation( + cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None, + None, None, None, None, None, behavior)