From 9280790a5d5817855463d8a940d386954dc62ce1 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Wed, 25 Feb 2015 17:14:11 +0000 Subject: [PATCH] Final changes to the early_adopter API. This makes grpc.early_adopter much more independent of RPC Framework and cleaner at the cost of reexporting most of the interfaces and writing several delegation classes. --- src/python/interop/interop/methods.py | 34 +-- .../grpc/early_adopter/_assembly_utilities.py | 168 +++++++++++ .../src/grpc/early_adopter/_face_utilities.py | 178 ------------ .../src/grpc/early_adopter/_reexport.py | 207 +++++++++++++ .../src/grpc/early_adopter/exceptions.py | 48 ++++ .../src/grpc/early_adopter/implementations.py | 145 +++++++--- .../early_adopter/implementations_test.py | 176 ++++++++++++ .../src/grpc/early_adopter/interfaces.py | 271 +++++++++++++++--- .../src/grpc/early_adopter/utilities.py | 132 ++++----- tools/run_tests/run_python.sh | 1 + 10 files changed, 1013 insertions(+), 347 deletions(-) create mode 100644 src/python/src/grpc/early_adopter/_assembly_utilities.py delete mode 100644 src/python/src/grpc/early_adopter/_face_utilities.py create mode 100644 src/python/src/grpc/early_adopter/_reexport.py create mode 100644 src/python/src/grpc/early_adopter/exceptions.py create mode 100644 src/python/src/grpc/early_adopter/implementations_test.py diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py index 26c1869f93e..6d5990087ee 100644 --- a/src/python/interop/interop/methods.py +++ b/src/python/interop/interop/methods.py @@ -34,47 +34,47 @@ from grpc.early_adopter import utilities from interop import empty_pb2 from interop import messages_pb2 -def _empty_call(request): +def _empty_call(request, unused_context): return empty_pb2.Empty() -_CLIENT_EMPTY_CALL = utilities.unary_unary_client_rpc_method( +_CLIENT_EMPTY_CALL = utilities.unary_unary_invocation_description( empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString) -_SERVER_EMPTY_CALL = utilities.unary_unary_server_rpc_method( +_SERVER_EMPTY_CALL = utilities.unary_unary_service_description( _empty_call, empty_pb2.Empty.FromString, empty_pb2.Empty.SerializeToString) -def _unary_call(request): +def _unary_call(request, unused_context): return messages_pb2.SimpleResponse( payload=messages_pb2.Payload( type=messages_pb2.COMPRESSABLE, body=b'\x00' * request.response_size)) -_CLIENT_UNARY_CALL = utilities.unary_unary_client_rpc_method( +_CLIENT_UNARY_CALL = utilities.unary_unary_invocation_description( messages_pb2.SimpleRequest.SerializeToString, messages_pb2.SimpleResponse.FromString) -_SERVER_UNARY_CALL = utilities.unary_unary_server_rpc_method( +_SERVER_UNARY_CALL = utilities.unary_unary_service_description( _unary_call, messages_pb2.SimpleRequest.FromString, messages_pb2.SimpleResponse.SerializeToString) -def _streaming_output_call(request): +def _streaming_output_call(request, unused_context): for response_parameters in request.response_parameters: yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( type=request.response_type, body=b'\x00' * response_parameters.size)) -_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_client_rpc_method( +_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_invocation_description( messages_pb2.StreamingOutputCallRequest.SerializeToString, messages_pb2.StreamingOutputCallResponse.FromString) -_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_server_rpc_method( +_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_service_description( _streaming_output_call, messages_pb2.StreamingOutputCallRequest.FromString, messages_pb2.StreamingOutputCallResponse.SerializeToString) -def _streaming_input_call(request_iterator): +def _streaming_input_call(request_iterator, unused_context): aggregate_size = 0 for request in request_iterator: if request.payload and request.payload.body: @@ -82,35 +82,35 @@ def _streaming_input_call(request_iterator): return messages_pb2.StreamingInputCallResponse( aggregated_payload_size=aggregate_size) -_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_client_rpc_method( +_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_invocation_description( messages_pb2.StreamingInputCallRequest.SerializeToString, messages_pb2.StreamingInputCallResponse.FromString) -_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_server_rpc_method( +_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_service_description( _streaming_input_call, messages_pb2.StreamingInputCallRequest.FromString, messages_pb2.StreamingInputCallResponse.SerializeToString) -def _full_duplex_call(request_iterator): +def _full_duplex_call(request_iterator, unused_context): for request in request_iterator: yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( type=request.payload.type, body=b'\x00' * request.response_parameters[0].size)) -_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_client_rpc_method( +_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_invocation_description( messages_pb2.StreamingOutputCallRequest.SerializeToString, messages_pb2.StreamingOutputCallResponse.FromString) -_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_server_rpc_method( +_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_service_description( _full_duplex_call, messages_pb2.StreamingOutputCallRequest.FromString, messages_pb2.StreamingOutputCallResponse.SerializeToString) # NOTE(nathaniel): Apparently this is the same as the full-duplex call? -_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_client_rpc_method( +_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_invocation_description( messages_pb2.StreamingOutputCallRequest.SerializeToString, messages_pb2.StreamingOutputCallResponse.FromString) -_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_server_rpc_method( +_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description( _full_duplex_call, messages_pb2.StreamingOutputCallRequest.FromString, messages_pb2.StreamingOutputCallResponse.SerializeToString) diff --git a/src/python/src/grpc/early_adopter/_assembly_utilities.py b/src/python/src/grpc/early_adopter/_assembly_utilities.py new file mode 100644 index 00000000000..facfc2bf0e6 --- /dev/null +++ b/src/python/src/grpc/early_adopter/_assembly_utilities.py @@ -0,0 +1,168 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import abc +import collections + +# assembly_interfaces is referenced from specification in this module. +from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import +from grpc.framework.assembly import utilities as assembly_utilities +from grpc.early_adopter import _reexport +from grpc.early_adopter import interfaces + + +# TODO(issue 726): Kill the "implementations" attribute of this in favor +# of the same-information-less-bogusly-represented "cardinalities". +class InvocationBreakdown(object): + """An intermediate representation of invocation-side views of RPC methods. + + Attributes: + cardinalities: A dictionary from RPC method name to interfaces.Cardinality + value. + implementations: A dictionary from RPC method name to + assembly_interfaces.MethodImplementation describing the method. + request_serializers: A dictionary from RPC method name to callable + behavior to be used serializing request values for the RPC. + response_deserializers: A dictionary from RPC method name to callable + behavior to be used deserializing response values for the RPC. + """ + __metaclass__ = abc.ABCMeta + + +class _EasyInvocationBreakdown( + InvocationBreakdown, + collections.namedtuple( + '_EasyInvocationBreakdown', + ('cardinalities', 'implementations', 'request_serializers', + 'response_deserializers'))): + pass + + +class ServiceBreakdown(object): + """An intermediate representation of service-side views of RPC methods. + + Attributes: + implementations: A dictionary from RPC method name + assembly_interfaces.MethodImplementation implementing the RPC method. + request_deserializers: A dictionary from RPC method name to callable + behavior to be used deserializing request values for the RPC. + response_serializers: A dictionary from RPC method name to callable + behavior to be used serializing response values for the RPC. + """ + __metaclass__ = abc.ABCMeta + + +class _EasyServiceBreakdown( + ServiceBreakdown, + collections.namedtuple( + '_EasyServiceBreakdown', + ('implementations', 'request_deserializers', 'response_serializers'))): + pass + + +def break_down_invocation(method_descriptions): + """Derives an InvocationBreakdown from several RPC method descriptions. + + Args: + method_descriptions: A dictionary from RPC method name to + interfaces.RpcMethodInvocationDescription describing the RPCs. + + Returns: + An InvocationBreakdown corresponding to the given method descriptions. + """ + cardinalities = {} + implementations = {} + request_serializers = {} + response_deserializers = {} + for name, method_description in method_descriptions.iteritems(): + cardinality = method_description.cardinality() + cardinalities[name] = cardinality + if cardinality is interfaces.Cardinality.UNARY_UNARY: + implementations[name] = assembly_utilities.unary_unary_inline(None) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + implementations[name] = assembly_utilities.unary_stream_inline(None) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + implementations[name] = assembly_utilities.stream_unary_inline(None) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + implementations[name] = assembly_utilities.stream_stream_inline(None) + request_serializers[name] = method_description.serialize_request + response_deserializers[name] = method_description.deserialize_response + return _EasyInvocationBreakdown( + cardinalities, implementations, request_serializers, + response_deserializers) + + +def break_down_service(method_descriptions): + """Derives a ServiceBreakdown from several RPC method descriptions. + + Args: + method_descriptions: A dictionary from RPC method name to + interfaces.RpcMethodServiceDescription describing the RPCs. + + Returns: + A ServiceBreakdown corresponding to the given method descriptions. + """ + implementations = {} + request_deserializers = {} + response_serializers = {} + for name, method_description in method_descriptions.iteritems(): + cardinality = method_description.cardinality() + if cardinality is interfaces.Cardinality.UNARY_UNARY: + def service( + request, face_rpc_context, + service_behavior=method_description.service_unary_unary): + return service_behavior( + request, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.unary_unary_inline(service) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + def service( + request, face_rpc_context, + service_behavior=method_description.service_unary_stream): + return service_behavior( + request, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.unary_stream_inline(service) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + def service( + request_iterator, face_rpc_context, + service_behavior=method_description.service_stream_unary): + return service_behavior( + request_iterator, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.stream_unary_inline(service) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + def service( + request_iterator, face_rpc_context, + service_behavior=method_description.service_stream_stream): + return service_behavior( + request_iterator, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.stream_stream_inline(service) + request_deserializers[name] = method_description.deserialize_request + response_serializers[name] = method_description.serialize_response + + return _EasyServiceBreakdown( + implementations, request_deserializers, response_serializers) diff --git a/src/python/src/grpc/early_adopter/_face_utilities.py b/src/python/src/grpc/early_adopter/_face_utilities.py deleted file mode 100644 index 3e37b08752b..00000000000 --- a/src/python/src/grpc/early_adopter/_face_utilities.py +++ /dev/null @@ -1,178 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import abc -import collections - -from grpc.framework.face import interfaces as face_interfaces - -from grpc.early_adopter import interfaces - - -class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod): - - def __init__(self, unary_unary_server_rpc_method): - self._method = unary_unary_server_rpc_method - - def service(self, request, context): - """See face_interfaces.InlineValueInValueOutMethod.service for spec.""" - return self._method.service_unary_unary(request) - - -class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): - - def __init__(self, unary_stream_server_rpc_method): - self._method = unary_stream_server_rpc_method - - def service(self, request, context): - """See face_interfaces.InlineValueInStreamOutMethod.service for spec.""" - return self._method.service_unary_stream(request) - - -class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): - - def __init__(self, stream_unary_server_rpc_method): - self._method = stream_unary_server_rpc_method - - def service(self, request_iterator, context): - """See face_interfaces.InlineStreamInValueOutMethod.service for spec.""" - return self._method.service_stream_unary(request_iterator) - - -class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): - - def __init__(self, stream_stream_server_rpc_method): - self._method = stream_stream_server_rpc_method - - def service(self, request_iterator, context): - """See face_interfaces.InlineStreamInStreamOutMethod.service for spec.""" - return self._method.service_stream_stream(request_iterator) - - -class ClientBreakdown(object): - """An intermediate representation of invocation-side views of RPC methods. - - Attributes: - request_serializers: A dictionary from RPC method name to callable - behavior to be used serializing request values for the RPC. - response_deserializers: A dictionary from RPC method name to callable - behavior to be used deserializing response values for the RPC. - """ - __metaclass__ = abc.ABCMeta - - -class _EasyClientBreakdown( - ClientBreakdown, - collections.namedtuple( - '_EasyClientBreakdown', - ('request_serializers', 'response_deserializers'))): - pass - - -class ServerBreakdown(object): - """An intermediate representation of implementations of RPC methods. - - Attributes: - unary_unary_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for unary-unary RPC methods. - unary_stream_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for unary-stream RPC methods. - stream_unary_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for stream-unary RPC methods. - stream_stream_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for stream-stream RPC methods. - request_deserializers: A dictionary from RPC method name to callable - behavior to be used deserializing request values for the RPC. - response_serializers: A dictionary from RPC method name to callable - behavior to be used serializing response values for the RPC. - """ - __metaclass__ = abc.ABCMeta - - - -class _EasyServerBreakdown( - ServerBreakdown, - collections.namedtuple( - '_EasyServerBreakdown', - ('unary_unary_methods', 'unary_stream_methods', 'stream_unary_methods', - 'stream_stream_methods', 'request_deserializers', - 'response_serializers'))): - pass - - -def client_break_down(methods): - """Derives a ClientBreakdown from several interfaces.ClientRpcMethods. - - Args: - methods: A dictionary from RPC mthod name to - interfaces.ClientRpcMethod object describing the RPCs. - - Returns: - A ClientBreakdown corresponding to the given methods. - """ - request_serializers = {} - response_deserializers = {} - for name, method in methods.iteritems(): - request_serializers[name] = method.serialize_request - response_deserializers[name] = method.deserialize_response - return _EasyClientBreakdown(request_serializers, response_deserializers) - - -def server_break_down(methods): - """Derives a ServerBreakdown from several interfaces.ServerRpcMethods. - - Args: - methods: A dictionary from RPC mthod name to - interfaces.ServerRpcMethod object describing the RPCs. - - Returns: - A ServerBreakdown corresponding to the given methods. - """ - unary_unary = {} - unary_stream = {} - stream_unary = {} - stream_stream = {} - request_deserializers = {} - response_serializers = {} - for name, method in methods.iteritems(): - cardinality = method.cardinality() - if cardinality is interfaces.Cardinality.UNARY_UNARY: - unary_unary[name] = _InlineUnaryUnaryMethod(method) - elif cardinality is interfaces.Cardinality.UNARY_STREAM: - unary_stream[name] = _InlineUnaryStreamMethod(method) - elif cardinality is interfaces.Cardinality.STREAM_UNARY: - stream_unary[name] = _InlineStreamUnaryMethod(method) - elif cardinality is interfaces.Cardinality.STREAM_STREAM: - stream_stream[name] = _InlineStreamStreamMethod(method) - request_deserializers[name] = method.deserialize_request - response_serializers[name] = method.serialize_response - - return _EasyServerBreakdown( - unary_unary, unary_stream, stream_unary, stream_stream, - request_deserializers, response_serializers) diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py new file mode 100644 index 00000000000..35855bc9c80 --- /dev/null +++ b/src/python/src/grpc/early_adopter/_reexport.py @@ -0,0 +1,207 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import abc +import collections + +from grpc.framework.face import exceptions as face_exceptions +from grpc.framework.face import interfaces as face_interfaces +from grpc.framework.foundation import future +from grpc.early_adopter import exceptions +from grpc.early_adopter import interfaces + +_ABORTION_REEXPORT = { + face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED, + face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED, + face_interfaces.Abortion.NETWORK_FAILURE: + interfaces.Abortion.NETWORK_FAILURE, + face_interfaces.Abortion.SERVICED_FAILURE: + interfaces.Abortion.SERVICED_FAILURE, + face_interfaces.Abortion.SERVICER_FAILURE: + interfaces.Abortion.SERVICER_FAILURE, +} + + +class _RpcError(exceptions.RpcError): + pass + + +def _reexport_error(face_rpc_error): + if isinstance(face_rpc_error, face_exceptions.CancellationError): + return exceptions.CancellationError() + elif isinstance(face_rpc_error, face_exceptions.ExpirationError): + return exceptions.ExpirationError() + else: + return _RpcError() + + +def _as_face_abortion_callback(abortion_callback): + def face_abortion_callback(face_abortion): + abortion_callback(_ABORTION_REEXPORT[face_abortion]) + return face_abortion_callback + + +class _ReexportedFuture(future.Future): + + def __init__(self, face_future): + self._face_future = face_future + + def cancel(self): + return self._face_future.cancel() + + def cancelled(self): + return self._face_future.cancelled() + + def running(self): + return self._face_future.running() + + def done(self): + return self._face_future.done() + + def result(self, timeout=None): + try: + return self._face_future.result(timeout=timeout) + except face_exceptions.RpcError as e: + raise _reexport_error(e) + + def exception(self, timeout=None): + face_error = self._face_future.exception(timeout=timeout) + return None if face_error is None else _reexport_error(face_error) + + def traceback(self, timeout=None): + return self._face_future.traceback(timeout=timeout) + + def add_done_callback(self, fn): + self._face_future.add_done_callback(lambda unused_face_future: fn(self)) + + +def _call_reexporting_errors(behavior, *args, **kwargs): + try: + return behavior(*args, **kwargs) + except face_exceptions.RpcError as e: + raise _reexport_error(e) + + +def _reexported_future(face_future): + return _ReexportedFuture(face_future) + + +class _CancellableIterator(interfaces.CancellableIterator): + + def __init__(self, face_cancellable_iterator): + self._face_cancellable_iterator = face_cancellable_iterator + + def __iter__(self): + return self + + def next(self): + return _call_reexporting_errors(self._face_cancellable_iterator.next) + + def cancel(self): + self._face_cancellable_iterator.cancel() + + +class _RpcContext(interfaces.RpcContext): + + def __init__(self, face_rpc_context): + self._face_rpc_context = face_rpc_context + + def is_active(self): + return self._face_rpc_context.is_active() + + def time_remaining(self): + return self._face_rpc_context.time_remaining() + + def add_abortion_callback(self, abortion_callback): + self._face_rpc_context.add_abortion_callback( + _as_face_abortion_callback(abortion_callback)) + + +class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync): + + def __init__(self, face_unary_unary_sync_async): + self._underlying = face_unary_unary_sync_async + + def __call__(self, request, timeout): + return _call_reexporting_errors( + self._underlying, request, timeout) + + def async(self, request, timeout): + return _ReexportedFuture(self._underlying.async(request, timeout)) + + +class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): + + def __init__(self, face_stream_unary_sync_async): + self._underlying = face_stream_unary_sync_async + + def __call__(self, request_iterator, timeout): + return _call_reexporting_errors( + self._underlying, request_iterator, timeout) + + def async(self, request_iterator, timeout): + return _ReexportedFuture(self._underlying.async(request_iterator, timeout)) + + +class _Stub(interfaces.Stub): + + def __init__(self, assembly_stub, cardinalities): + self._assembly_stub = assembly_stub + self._cardinalities = cardinalities + + def __enter__(self): + self._assembly_stub.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._assembly_stub.__exit__(exc_type, exc_val, exc_tb) + return False + + def __getattr__(self, attr): + underlying_attr = self._assembly_stub.__getattr__(attr) + cardinality = self._cardinalities.get(attr) + if cardinality is interfaces.Cardinality.UNARY_UNARY: + return _UnaryUnarySyncAsync(underlying_attr) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + return lambda request, timeout: _CancellableIterator( + underlying_attr(request, timeout)) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + return _StreamUnarySyncAsync(underlying_attr) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + return lambda request_iterator, timeout: _CancellableIterator( + underlying_attr(request_iterator, timeout)) + else: + raise AttributeError(attr) + +def rpc_context(face_rpc_context): + return _RpcContext(face_rpc_context) + + +def stub(assembly_stub, cardinalities): + return _Stub(assembly_stub, cardinalities) diff --git a/src/python/src/grpc/early_adopter/exceptions.py b/src/python/src/grpc/early_adopter/exceptions.py new file mode 100644 index 00000000000..5234d3b91ce --- /dev/null +++ b/src/python/src/grpc/early_adopter/exceptions.py @@ -0,0 +1,48 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Exceptions raised by GRPC. + +Only GRPC should instantiate and raise these exceptions. +""" + +import abc + + +class RpcError(Exception): + """Common super type for all exceptions raised by GRPC.""" + __metaclass__ = abc.ABCMeta + + +class CancellationError(RpcError): + """Indicates that an RPC has been cancelled.""" + + +class ExpirationError(RpcError): + """Indicates that an RPC has expired ("timed out").""" diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index 1d76d0f9e0d..241ed7dcdbb 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -31,15 +31,12 @@ import threading -from grpc._adapter import fore -from grpc.framework.base.packets import implementations as _tickets_implementations -from grpc.framework.face import implementations as _face_implementations -from grpc.framework.foundation import logging_pool -from grpc.early_adopter import _face_utilities +from grpc._adapter import fore as _fore +from grpc._adapter import rear as _rear +from grpc.early_adopter import _assembly_utilities +from grpc.early_adopter import _reexport from grpc.early_adopter import interfaces - -_MEGA_TIMEOUT = 60 * 60 * 24 -_THREAD_POOL_SIZE = 80 +from grpc.framework.assembly import implementations as _assembly_implementations class _Server(interfaces.Server): @@ -48,63 +45,120 @@ class _Server(interfaces.Server): self._lock = threading.Lock() self._breakdown = breakdown self._port = port - self._private_key = private_key - self._certificate_chain = certificate_chain + if private_key is None or certificate_chain is None: + self._key_chain_pairs = () + else: + self._key_chain_pairs = ((private_key, certificate_chain),) - self._pool = None self._fore_link = None - self._back = None + self._server = None - def start(self): - """See interfaces.Server.start for specification.""" + def _start(self): with self._lock: - if self._pool is None: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - servicer = _face_implementations.servicer( - self._pool, - inline_value_in_value_out_methods=self._breakdown.unary_unary_methods, - inline_value_in_stream_out_methods=self._breakdown.unary_stream_methods, - inline_stream_in_value_out_methods=self._breakdown.stream_unary_methods, - inline_stream_in_stream_out_methods=self._breakdown.stream_stream_methods) - self._fore_link = fore.ForeLink( - self._pool, self._breakdown.request_deserializers, - self._breakdown.response_serializers, None, - ((self._private_key, self._certificate_chain),), port=self._port) - self._fore_link.start() - port = self._fore_link.port() - self._back = _tickets_implementations.back( - servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT, - _MEGA_TIMEOUT) - self._fore_link.join_rear_link(self._back) - self._back.join_fore_link(self._fore_link) - return port + if self._server is None: + self._fore_link = _fore.activated_fore_link( + self._port, self._breakdown.request_deserializers, + self._breakdown.response_serializers, None, self._key_chain_pairs) + + self._server = _assembly_implementations.assemble_service( + self._breakdown.implementations, self._fore_link) + self._server.start() else: raise ValueError('Server currently running!') - def stop(self): - """See interfaces.Server.stop for specification.""" + def _stop(self): with self._lock: - if self._pool is None: + if self._server is None: raise ValueError('Server not running!') else: - self._fore_link.stop() - self._pool.shutdown(wait=True) - self._pool = None + self._server.stop() + self._server = None + self._fore_link = None + + def __enter__(self): + self._start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop() + return False + + def start(self): + self._start() + + def stop(self): + self._stop() + + def port(self): + with self._lock: + return self._fore_link.port() + +def _build_stub( + methods, host, port, root_certificates, private_key, certificate_chain): + breakdown = _assembly_utilities.break_down_invocation(methods) + # TODO(nathaniel): pass security values. + activated_rear_link = _rear.activated_rear_link( + host, port, breakdown.request_serializers, + breakdown.response_deserializers) + assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub( + breakdown.implementations, activated_rear_link) + return _reexport.stub(assembly_stub, breakdown.cardinalities) def _build_server(methods, port, private_key, certificate_chain): - breakdown = _face_utilities.server_break_down(methods) + breakdown = _assembly_utilities.break_down_service(methods) return _Server(breakdown, port, private_key, certificate_chain) +def insecure_stub(methods, host, port): + """Constructs an insecure interfaces.Stub. + + Args: + methods: A dictionary from RPC method name to + interfaces.RpcMethodInvocationDescription describing the RPCs to be + supported by the created stub. + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + + Returns: + An interfaces.Stub affording RPC invocation. + """ + return _build_stub(methods, host, port, None, None, None) + + +def secure_stub( + methods, host, port, root_certificates, private_key, certificate_chain): + """Constructs an insecure interfaces.Stub. + + Args: + methods: A dictionary from RPC method name to + interfaces.RpcMethodInvocationDescription describing the RPCs to be + supported by the created stub. + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + root_certificates: The PEM-encoded root certificates or None to ask for + them to be retrieved from a default location. + private_key: The PEM-encoded private key to use or None if no private key + should be used. + certificate_chain: The PEM-encoded certificate chain to use or None if no + certificate chain should be used. + + Returns: + An interfaces.Stub affording RPC invocation. + """ + return _build_stub( + methods, host, port, root_certificates, private_key, certificate_chain) + + def insecure_server(methods, port): """Constructs an insecure interfaces.Server. Args: methods: A dictionary from RPC method name to - interfaces.ServerRpcMethod object describing the RPCs to + interfaces.RpcMethodServiceDescription describing the RPCs to be serviced by the created server. - port: The port on which to serve. + port: The desired port on which to serve or zero to ask for a port to + be automatically selected. Returns: An interfaces.Server that will run with no security and @@ -118,9 +172,10 @@ def secure_server(methods, port, private_key, certificate_chain): Args: methods: A dictionary from RPC method name to - interfaces.ServerRpcMethod object describing the RPCs to + interfaces.RpcMethodServiceDescription describing the RPCs to be serviced by the created server. - port: The port on which to serve. + port: The port on which to serve or zero to ask for a port to be + automatically selected. private_key: A pem-encoded private key. certificate_chain: A pem-encoded certificate chain. diff --git a/src/python/src/grpc/early_adopter/implementations_test.py b/src/python/src/grpc/early_adopter/implementations_test.py new file mode 100644 index 00000000000..9ef06c32cbc --- /dev/null +++ b/src/python/src/grpc/early_adopter/implementations_test.py @@ -0,0 +1,176 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# TODO(nathaniel): Expand this test coverage. + +"""Test of the GRPC-backed ForeLink and RearLink.""" + +import unittest + +from grpc.early_adopter import implementations +from grpc.early_adopter import utilities +from grpc._junkdrawer import math_pb2 + +DIV = 'Div' +DIV_MANY = 'DivMany' +FIB = 'Fib' +SUM = 'Sum' + +def _fibbonacci(limit): + left, right = 0, 1 + for _ in xrange(limit): + yield left + left, right = right, left + right + + +def _div(request, unused_context): + return math_pb2.DivReply( + quotient=request.dividend / request.divisor, + remainder=request.dividend % request.divisor) + + +def _div_many(request_iterator, unused_context): + for request in request_iterator: + yield math_pb2.DivReply( + quotient=request.dividend / request.divisor, + remainder=request.dividend % request.divisor) + + +def _fib(request, unused_context): + for number in _fibbonacci(request.limit): + yield math_pb2.Num(num=number) + + +def _sum(request_iterator, unused_context): + accumulation = 0 + for request in request_iterator: + accumulation += request.num + return math_pb2.Num(num=accumulation) + + +_INVOCATION_DESCRIPTIONS = { + DIV: utilities.unary_unary_invocation_description( + math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString), + DIV_MANY: utilities.stream_stream_invocation_description( + math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString), + FIB: utilities.unary_stream_invocation_description( + math_pb2.FibArgs.SerializeToString, math_pb2.Num.FromString), + SUM: utilities.stream_unary_invocation_description( + math_pb2.Num.SerializeToString, math_pb2.Num.FromString), +} + +_SERVICE_DESCRIPTIONS = { + DIV: utilities.unary_unary_service_description( + _div, math_pb2.DivArgs.FromString, + math_pb2.DivReply.SerializeToString), + DIV_MANY: utilities.stream_stream_service_description( + _div_many, math_pb2.DivArgs.FromString, + math_pb2.DivReply.SerializeToString), + FIB: utilities.unary_stream_service_description( + _fib, math_pb2.FibArgs.FromString, math_pb2.Num.SerializeToString), + SUM: utilities.stream_unary_service_description( + _sum, math_pb2.Num.FromString, math_pb2.Num.SerializeToString), +} + +_TIMEOUT = 3 + + +class EarlyAdopterImplementationsTest(unittest.TestCase): + + def setUp(self): + self.server = implementations.insecure_server(_SERVICE_DESCRIPTIONS, 0) + self.server.start() + port = self.server.port() + self.stub = implementations.insecure_stub(_INVOCATION_DESCRIPTIONS, 'localhost', port) + + def tearDown(self): + self.server.stop() + + def testUpAndDown(self): + with self.stub: + pass + + def testUnaryUnary(self): + divisor = 59 + dividend = 973 + expected_quotient = dividend / divisor + expected_remainder = dividend % divisor + + with self.stub: + response = self.stub.Div( + math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT) + self.assertEqual(expected_quotient, response.quotient) + self.assertEqual(expected_remainder, response.remainder) + + def testUnaryStream(self): + stream_length = 43 + + with self.stub: + response_iterator = self.stub.Fib( + math_pb2.FibArgs(limit=stream_length), _TIMEOUT) + numbers = tuple(response.num for response in response_iterator) + for early, middle, later in zip(numbers, numbers[:1], numbers[:2]): + self.assertEqual(early + middle, later) + self.assertEqual(stream_length, len(numbers)) + + def testStreamUnary(self): + stream_length = 127 + + with self.stub: + response_future = self.stub.Sum.async( + (math_pb2.Num(num=index) for index in range(stream_length)), + _TIMEOUT) + self.assertEqual( + (stream_length * (stream_length - 1)) / 2, + response_future.result().num) + + def testStreamStream(self): + stream_length = 179 + divisor_offset = 71 + dividend_offset = 1763 + + with self.stub: + response_iterator = self.stub.DivMany( + (math_pb2.DivArgs( + divisor=divisor_offset + index, + dividend=dividend_offset + index) + for index in range(stream_length)), + _TIMEOUT) + for index, response in enumerate(response_iterator): + self.assertEqual( + (dividend_offset + index) / (divisor_offset + index), + response.quotient) + self.assertEqual( + (dividend_offset + index) % (divisor_offset + index), + response.remainder) + self.assertEqual(stream_length, index + 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/grpc/early_adopter/interfaces.py b/src/python/src/grpc/early_adopter/interfaces.py index 0ec371f8e88..b733873c1c9 100644 --- a/src/python/src/grpc/early_adopter/interfaces.py +++ b/src/python/src/grpc/early_adopter/interfaces.py @@ -32,6 +32,11 @@ import abc import enum +# exceptions is referenced from specification in this module. +from grpc.early_adopter import exceptions # pylint: disable=unused-import +from grpc.framework.foundation import activated +from grpc.framework.foundation import future + @enum.unique class Cardinality(enum.Enum): @@ -43,24 +48,166 @@ class Cardinality(enum.Enum): STREAM_STREAM = 'request-streaming/response-streaming' -class RpcMethod(object): - """A type for the common aspects of RPC method specifications.""" +@enum.unique +class Abortion(enum.Enum): + """Categories of RPC abortion.""" + + CANCELLED = 'cancelled' + EXPIRED = 'expired' + NETWORK_FAILURE = 'network failure' + SERVICED_FAILURE = 'serviced failure' + SERVICER_FAILURE = 'servicer failure' + + +class CancellableIterator(object): + """Implements the Iterator protocol and affords a cancel method.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __iter__(self): + """Returns the self object in accordance with the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def next(self): + """Returns a value or raises StopIteration per the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Requests cancellation of whatever computation underlies this iterator.""" + raise NotImplementedError() + + +class RpcContext(object): + """Provides RPC-related information and action.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def is_active(self): + """Describes whether the RPC is active or has terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the RPC. + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have timed + out. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_abortion_callback(self, abortion_callback): + """Registers a callback to be called if the RPC is aborted. + Args: + abortion_callback: A callable to be called and passed an Abortion value + in the event of RPC abortion. + """ + raise NotImplementedError() + + +class UnaryUnarySyncAsync(object): + """Affords invoking a unary-unary RPC synchronously or asynchronously. + Values implementing this interface are directly callable and present an + "async" method. Both calls take a request value and a numeric timeout. + Direct invocation of a value of this type invokes its associated RPC and + blocks until the RPC's response is available. Calling the "async" method + of a value of this type invokes its associated RPC and immediately returns a + future.Future bound to the asynchronous execution of the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request, timeout): + """Synchronously invokes the underlying RPC. + Args: + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + Returns: + The response value for the RPC. + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def async(self, request, timeout): + """Asynchronously invokes the underlying RPC. + Args: + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. + """ + raise NotImplementedError() + + +class StreamUnarySyncAsync(object): + """Affords invoking a stream-unary RPC synchronously or asynchronously. + Values implementing this interface are directly callable and present an + "async" method. Both calls take an iterator of request values and a numeric + timeout. Direct invocation of a value of this type invokes its associated RPC + and blocks until the RPC's response is available. Calling the "async" method + of a value of this type invokes its associated RPC and immediately returns a + future.Future bound to the asynchronous execution of the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request_iterator, timeout): + """Synchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def async(self, request_iterator, timeout): + """Asynchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. + """ + raise NotImplementedError() + + +class RpcMethodDescription(object): + """A type for the common aspects of RPC method descriptions.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod def cardinality(self): - """Identifies the cardinality of this RpcMethod. + """Identifies the cardinality of this RpcMethodDescription. Returns: A Cardinality value identifying whether or not this - RpcMethod is request-unary or request-streaming and - whether or not it is response-unary or - response-streaming. + RpcMethodDescription is request-unary or request-streaming and + whether or not it is response-unary or response-streaming. """ raise NotImplementedError() -class ClientRpcMethod(RpcMethod): +class RpcMethodInvocationDescription(RpcMethodDescription): """Invocation-side description of an RPC method.""" __metaclass__ = abc.ABCMeta @@ -69,7 +216,8 @@ class ClientRpcMethod(RpcMethod): """Serializes a request value. Args: - request: A request value appropriate for this RpcMethod. + request: A request value appropriate for the RPC method described by this + RpcMethodInvocationDescription. Returns: The serialization of the given request value as a @@ -82,9 +230,9 @@ class ClientRpcMethod(RpcMethod): """Deserializes a response value. Args: - serialized_response: A bytestring that is the - serialization of a response value appropriate for this - RpcMethod. + serialized_response: A bytestring that is the serialization of a response + value appropriate for the RPC method described by this + RpcMethodInvocationDescription. Returns: A response value corresponding to the given bytestring. @@ -92,7 +240,7 @@ class ClientRpcMethod(RpcMethod): raise NotImplementedError() -class ServerRpcMethod(RpcMethod): +class RpcMethodServiceDescription(RpcMethodDescription): """Service-side description of an RPC method.""" __metaclass__ = abc.ABCMeta @@ -101,9 +249,9 @@ class ServerRpcMethod(RpcMethod): """Deserializes a request value. Args: - serialized_request: A bytestring that is the - serialization of a request value appropriate for this - RpcMethod. + serialized_request: A bytestring that is the serialization of a request + value appropriate for the RPC method described by this + RpcMethodServiceDescription. Returns: A request value corresponding to the given bytestring. @@ -115,7 +263,8 @@ class ServerRpcMethod(RpcMethod): """Serializes a response value. Args: - response: A response value appropriate for this RpcMethod. + response: A response value appropriate for the RPC method described by + this RpcMethodServiceDescription. Returns: The serialization of the given response value as a @@ -124,80 +273,116 @@ class ServerRpcMethod(RpcMethod): raise NotImplementedError() @abc.abstractmethod - def service_unary_unary(self, request): + def service_unary_unary(self, request, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.UNARY_UNARY. + RpcMethodServiceDescription is Cardinality.UNARY_UNARY. Args: - request: A request value appropriate for this RpcMethod. + request: A request value appropriate for the RPC method described by this + RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Returns: - A response value appropriate for this RpcMethod. + A response value appropriate for the RPC method described by this + RpcMethodServiceDescription. """ raise NotImplementedError() @abc.abstractmethod - def service_unary_stream(self, request): + def service_unary_stream(self, request, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.UNARY_STREAM. + RpcMethodServiceDescription is Cardinality.UNARY_STREAM. Args: - request: A request value appropriate for this RpcMethod. + request: A request value appropriate for the RPC method described by this + RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Yields: - Zero or more response values appropriate for this - RpcMethod. + Zero or more response values appropriate for the RPC method described by + this RpcMethodServiceDescription. """ raise NotImplementedError() @abc.abstractmethod - def service_stream_unary(self, request_iterator): + def service_stream_unary(self, request_iterator, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.STREAM_UNARY. + RpcMethodServiceDescription is Cardinality.STREAM_UNARY. Args: - request_iterator: An iterator of request values - appropriate for this RpcMethod. + request_iterator: An iterator of request values appropriate for the RPC + method described by this RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Returns: - A response value appropriate for this RpcMethod. + A response value appropriate for the RPC method described by this + RpcMethodServiceDescription. """ raise NotImplementedError() @abc.abstractmethod - def service_stream_stream(self, request_iterator): + def service_stream_stream(self, request_iterator, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.STREAM_STREAM. + RpcMethodServiceDescription is Cardinality.STREAM_STREAM. Args: - request_iterator: An iterator of request values - appropriate for this RpcMethod. + request_iterator: An iterator of request values appropriate for the RPC + method described by this RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Yields: - Zero or more response values appropraite for this - RpcMethod. + Zero or more response values appropriate for the RPC method described by + this RpcMethodServiceDescription. """ raise NotImplementedError() -class Server(object): +class Stub(object): + """A stub with callable RPC method names for attributes. + + Instances of this type are context managers and only afford RPC invocation + when used in context. + + Instances of this type, when used in context, respond to attribute access + as follows: if the requested attribute is the name of a unary-unary RPC + method, the value of the attribute will be a UnaryUnarySyncAsync with which + to invoke the RPC method. If the requested attribute is the name of a + unary-stream RPC method, the value of the attribute will be a callable taking + a request object and a timeout parameter and returning a CancellableIterator + that yields the response values of the RPC. If the requested attribute is the + name of a stream-unary RPC method, the value of the attribute will be a + StreamUnarySyncAsync with which to invoke the RPC method. If the requested + attribute is the name of a stream-stream RPC method, the value of the + attribute will be a callable taking an iterator of request objects and a + timeout and returning a CancellableIterator that yields the response values + of the RPC. + + In all cases indication of abortion is indicated by raising of + exceptions.RpcError, exceptions.CancellationError, + and exceptions.ExpirationError. + """ + __metaclass__ = abc.ABCMeta + + +class Server(activated.Activated): """A GRPC Server.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def start(self): - """Instructs this server to commence service of RPCs.""" - raise NotImplementedError() + def port(self): + """Reports the port on which the server is serving. - @abc.abstractmethod - def stop(self): - """Instructs this server to halt service of RPCs.""" + This method may only be called while the server is activated. + + Returns: + The port on which the server is serving. + """ raise NotImplementedError() diff --git a/src/python/src/grpc/early_adopter/utilities.py b/src/python/src/grpc/early_adopter/utilities.py index 9277d3f6ad4..da8ef825aa5 100644 --- a/src/python/src/grpc/early_adopter/utilities.py +++ b/src/python/src/grpc/early_adopter/utilities.py @@ -32,7 +32,9 @@ from grpc.early_adopter import interfaces -class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod): +class _RpcMethodDescription( + interfaces.RpcMethodInvocationDescription, + interfaces.RpcMethodServiceDescription): def __init__( self, cardinality, unary_unary, unary_stream, stream_unary, @@ -49,44 +51,45 @@ class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod): self._response_deserializer = response_deserializer def cardinality(self): - """See interfaces.RpcMethod.cardinality for specification.""" + """See interfaces.RpcMethodDescription.cardinality for specification.""" return self._cardinality def serialize_request(self, request): - """See interfaces.RpcMethod.serialize_request for specification.""" + """See interfaces.RpcMethodInvocationDescription.serialize_request.""" return self._request_serializer(request) def deserialize_request(self, serialized_request): - """See interfaces.RpcMethod.deserialize_request for specification.""" + """See interfaces.RpcMethodServiceDescription.deserialize_request.""" return self._request_deserializer(serialized_request) def serialize_response(self, response): - """See interfaces.RpcMethod.serialize_response for specification.""" + """See interfaces.RpcMethodServiceDescription.serialize_response.""" return self._response_serializer(response) def deserialize_response(self, serialized_response): - """See interfaces.RpcMethod.deserialize_response for specification.""" + """See interfaces.RpcMethodInvocationDescription.deserialize_response.""" return self._response_deserializer(serialized_response) - def service_unary_unary(self, request): - """See interfaces.RpcMethod.service_unary_unary for specification.""" - return self._unary_unary(request) + def service_unary_unary(self, request, context): + """See interfaces.RpcMethodServiceDescription.service_unary_unary.""" + return self._unary_unary(request, context) - def service_unary_stream(self, request): - """See interfaces.RpcMethod.service_unary_stream for specification.""" - return self._unary_stream(request) + def service_unary_stream(self, request, context): + """See interfaces.RpcMethodServiceDescription.service_unary_stream.""" + return self._unary_stream(request, context) - def service_stream_unary(self, request_iterator): - """See interfaces.RpcMethod.service_stream_unary for specification.""" - return self._stream_unary(request_iterator) + def service_stream_unary(self, request_iterator, context): + """See interfaces.RpcMethodServiceDescription.service_stream_unary.""" + return self._stream_unary(request_iterator, context) - def service_stream_stream(self, request_iterator): - """See interfaces.RpcMethod.service_stream_stream for specification.""" - return self._stream_stream(request_iterator) + def service_stream_stream(self, request_iterator, context): + """See interfaces.RpcMethodServiceDescription.service_stream_stream.""" + return self._stream_stream(request_iterator, context) -def unary_unary_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a unary-unary RPC method. +def unary_unary_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -96,17 +99,17 @@ def unary_unary_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a unary-request/unary-response RPC - method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a unary-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_UNARY, None, None, None, None, request_serializer, None, None, response_deserializer) -def unary_stream_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a unary-stream RPC method. +def unary_stream_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -116,17 +119,17 @@ def unary_stream_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a unary-request/streaming-response - RPC method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a unary-request/streaming-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_STREAM, None, None, None, None, request_serializer, None, None, response_deserializer) -def stream_unary_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a stream-unary RPC method. +def stream_unary_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -136,17 +139,17 @@ def stream_unary_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a streaming-request/unary-response - RPC method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a streaming-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_UNARY, None, None, None, None, request_serializer, None, None, response_deserializer) -def stream_stream_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a stream-stream RPC method. +def stream_stream_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -156,23 +159,23 @@ def stream_stream_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a - streaming-request/streaming-response RPC method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a streaming-request/streaming-response RPC + method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_STREAM, None, None, None, None, request_serializer, None, None, response_deserializer) -def unary_unary_server_rpc_method( +def unary_unary_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a unary-unary RPC - method that accepts a single request and returns a single - response. + method that accepts a single request and an interfaces.RpcContext and + returns a single response. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -181,23 +184,23 @@ def unary_unary_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a unary-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_UNARY, behavior, None, None, None, None, request_deserializer, response_serializer, None) -def unary_stream_server_rpc_method( +def unary_stream_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a unary-stream RPC - method that accepts a single request and returns an - iterator of zero or more responses. + method that accepts a single request and an interfaces.RpcContext + and returns an iterator of zero or more responses. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -206,23 +209,23 @@ def unary_stream_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a unary-request/streaming-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_STREAM, None, behavior, None, None, None, request_deserializer, response_serializer, None) -def stream_unary_server_rpc_method( +def stream_unary_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a stream-unary RPC method that accepts an iterator of zero or more requests - and returns a single response. + and an interfaces.RpcContext and returns a single response. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -231,23 +234,24 @@ def stream_unary_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a streaming-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_UNARY, None, None, behavior, None, None, request_deserializer, response_serializer, None) -def stream_stream_server_rpc_method( +def stream_stream_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a stream-stream RPC method that accepts an iterator of zero or more requests - and returns an iterator of zero or more responses. + and an interfaces.RpcContext and returns an iterator of + zero or more responses. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -256,10 +260,10 @@ def stream_stream_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a streaming-request/streaming-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_STREAM, None, None, None, behavior, None, request_deserializer, response_serializer, None) diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index fe40b511860..2ef5b9eb232 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -45,6 +45,7 @@ python2.7 -B -m grpc._adapter._future_invocation_asynchronous_event_service_test python2.7 -B -m grpc._adapter._links_test python2.7 -B -m grpc._adapter._lonely_rear_link_test python2.7 -B -m grpc._adapter._low_test +python2.7 -B -m grpc.early_adopter.implementations_test python2.7 -B -m grpc.framework.assembly.implementations_test python2.7 -B -m grpc.framework.base.packets.implementations_test python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test