Final changes to the early_adopter API.

This makes grpc.early_adopter much more independent of RPC
Framework and cleaner at the cost of reexporting most of the
interfaces and writing several delegation classes.
pull/785/head
Nathaniel Manista 10 years ago
parent 517aa0c535
commit 9280790a5d
  1. 34
      src/python/interop/interop/methods.py
  2. 168
      src/python/src/grpc/early_adopter/_assembly_utilities.py
  3. 178
      src/python/src/grpc/early_adopter/_face_utilities.py
  4. 207
      src/python/src/grpc/early_adopter/_reexport.py
  5. 48
      src/python/src/grpc/early_adopter/exceptions.py
  6. 145
      src/python/src/grpc/early_adopter/implementations.py
  7. 176
      src/python/src/grpc/early_adopter/implementations_test.py
  8. 271
      src/python/src/grpc/early_adopter/interfaces.py
  9. 132
      src/python/src/grpc/early_adopter/utilities.py
  10. 1
      tools/run_tests/run_python.sh

@ -34,47 +34,47 @@ from grpc.early_adopter import utilities
from interop import empty_pb2
from interop import messages_pb2
def _empty_call(request):
def _empty_call(request, unused_context):
return empty_pb2.Empty()
_CLIENT_EMPTY_CALL = utilities.unary_unary_client_rpc_method(
_CLIENT_EMPTY_CALL = utilities.unary_unary_invocation_description(
empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString)
_SERVER_EMPTY_CALL = utilities.unary_unary_server_rpc_method(
_SERVER_EMPTY_CALL = utilities.unary_unary_service_description(
_empty_call, empty_pb2.Empty.FromString,
empty_pb2.Empty.SerializeToString)
def _unary_call(request):
def _unary_call(request, unused_context):
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
body=b'\x00' * request.response_size))
_CLIENT_UNARY_CALL = utilities.unary_unary_client_rpc_method(
_CLIENT_UNARY_CALL = utilities.unary_unary_invocation_description(
messages_pb2.SimpleRequest.SerializeToString,
messages_pb2.SimpleResponse.FromString)
_SERVER_UNARY_CALL = utilities.unary_unary_server_rpc_method(
_SERVER_UNARY_CALL = utilities.unary_unary_service_description(
_unary_call, messages_pb2.SimpleRequest.FromString,
messages_pb2.SimpleResponse.SerializeToString)
def _streaming_output_call(request):
def _streaming_output_call(request, unused_context):
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.response_type,
body=b'\x00' * response_parameters.size))
_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_client_rpc_method(
_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_invocation_description(
messages_pb2.StreamingOutputCallRequest.SerializeToString,
messages_pb2.StreamingOutputCallResponse.FromString)
_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_server_rpc_method(
_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_service_description(
_streaming_output_call,
messages_pb2.StreamingOutputCallRequest.FromString,
messages_pb2.StreamingOutputCallResponse.SerializeToString)
def _streaming_input_call(request_iterator):
def _streaming_input_call(request_iterator, unused_context):
aggregate_size = 0
for request in request_iterator:
if request.payload and request.payload.body:
@ -82,35 +82,35 @@ def _streaming_input_call(request_iterator):
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_client_rpc_method(
_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_invocation_description(
messages_pb2.StreamingInputCallRequest.SerializeToString,
messages_pb2.StreamingInputCallResponse.FromString)
_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_server_rpc_method(
_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_service_description(
_streaming_input_call,
messages_pb2.StreamingInputCallRequest.FromString,
messages_pb2.StreamingInputCallResponse.SerializeToString)
def _full_duplex_call(request_iterator):
def _full_duplex_call(request_iterator, unused_context):
for request in request_iterator:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.payload.type,
body=b'\x00' * request.response_parameters[0].size))
_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_client_rpc_method(
_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_invocation_description(
messages_pb2.StreamingOutputCallRequest.SerializeToString,
messages_pb2.StreamingOutputCallResponse.FromString)
_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_server_rpc_method(
_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_service_description(
_full_duplex_call,
messages_pb2.StreamingOutputCallRequest.FromString,
messages_pb2.StreamingOutputCallResponse.SerializeToString)
# NOTE(nathaniel): Apparently this is the same as the full-duplex call?
_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_client_rpc_method(
_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_invocation_description(
messages_pb2.StreamingOutputCallRequest.SerializeToString,
messages_pb2.StreamingOutputCallResponse.FromString)
_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_server_rpc_method(
_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description(
_full_duplex_call,
messages_pb2.StreamingOutputCallRequest.FromString,
messages_pb2.StreamingOutputCallResponse.SerializeToString)

@ -0,0 +1,168 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import abc
import collections
# assembly_interfaces is referenced from specification in this module.
from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import
from grpc.framework.assembly import utilities as assembly_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
# TODO(issue 726): Kill the "implementations" attribute of this in favor
# of the same-information-less-bogusly-represented "cardinalities".
class InvocationBreakdown(object):
"""An intermediate representation of invocation-side views of RPC methods.
Attributes:
cardinalities: A dictionary from RPC method name to interfaces.Cardinality
value.
implementations: A dictionary from RPC method name to
assembly_interfaces.MethodImplementation describing the method.
request_serializers: A dictionary from RPC method name to callable
behavior to be used serializing request values for the RPC.
response_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing response values for the RPC.
"""
__metaclass__ = abc.ABCMeta
class _EasyInvocationBreakdown(
InvocationBreakdown,
collections.namedtuple(
'_EasyInvocationBreakdown',
('cardinalities', 'implementations', 'request_serializers',
'response_deserializers'))):
pass
class ServiceBreakdown(object):
"""An intermediate representation of service-side views of RPC methods.
Attributes:
implementations: A dictionary from RPC method name
assembly_interfaces.MethodImplementation implementing the RPC method.
request_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing request values for the RPC.
response_serializers: A dictionary from RPC method name to callable
behavior to be used serializing response values for the RPC.
"""
__metaclass__ = abc.ABCMeta
class _EasyServiceBreakdown(
ServiceBreakdown,
collections.namedtuple(
'_EasyServiceBreakdown',
('implementations', 'request_deserializers', 'response_serializers'))):
pass
def break_down_invocation(method_descriptions):
"""Derives an InvocationBreakdown from several RPC method descriptions.
Args:
method_descriptions: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs.
Returns:
An InvocationBreakdown corresponding to the given method descriptions.
"""
cardinalities = {}
implementations = {}
request_serializers = {}
response_deserializers = {}
for name, method_description in method_descriptions.iteritems():
cardinality = method_description.cardinality()
cardinalities[name] = cardinality
if cardinality is interfaces.Cardinality.UNARY_UNARY:
implementations[name] = assembly_utilities.unary_unary_inline(None)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
implementations[name] = assembly_utilities.unary_stream_inline(None)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
implementations[name] = assembly_utilities.stream_unary_inline(None)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
implementations[name] = assembly_utilities.stream_stream_inline(None)
request_serializers[name] = method_description.serialize_request
response_deserializers[name] = method_description.deserialize_response
return _EasyInvocationBreakdown(
cardinalities, implementations, request_serializers,
response_deserializers)
def break_down_service(method_descriptions):
"""Derives a ServiceBreakdown from several RPC method descriptions.
Args:
method_descriptions: A dictionary from RPC method name to
interfaces.RpcMethodServiceDescription describing the RPCs.
Returns:
A ServiceBreakdown corresponding to the given method descriptions.
"""
implementations = {}
request_deserializers = {}
response_serializers = {}
for name, method_description in method_descriptions.iteritems():
cardinality = method_description.cardinality()
if cardinality is interfaces.Cardinality.UNARY_UNARY:
def service(
request, face_rpc_context,
service_behavior=method_description.service_unary_unary):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.unary_unary_inline(service)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
def service(
request, face_rpc_context,
service_behavior=method_description.service_unary_stream):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.unary_stream_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_unary):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.stream_unary_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_stream):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.stream_stream_inline(service)
request_deserializers[name] = method_description.deserialize_request
response_serializers[name] = method_description.serialize_response
return _EasyServiceBreakdown(
implementations, request_deserializers, response_serializers)

@ -1,178 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import abc
import collections
from grpc.framework.face import interfaces as face_interfaces
from grpc.early_adopter import interfaces
class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
def __init__(self, unary_unary_server_rpc_method):
self._method = unary_unary_server_rpc_method
def service(self, request, context):
"""See face_interfaces.InlineValueInValueOutMethod.service for spec."""
return self._method.service_unary_unary(request)
class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
def __init__(self, unary_stream_server_rpc_method):
self._method = unary_stream_server_rpc_method
def service(self, request, context):
"""See face_interfaces.InlineValueInStreamOutMethod.service for spec."""
return self._method.service_unary_stream(request)
class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
def __init__(self, stream_unary_server_rpc_method):
self._method = stream_unary_server_rpc_method
def service(self, request_iterator, context):
"""See face_interfaces.InlineStreamInValueOutMethod.service for spec."""
return self._method.service_stream_unary(request_iterator)
class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
def __init__(self, stream_stream_server_rpc_method):
self._method = stream_stream_server_rpc_method
def service(self, request_iterator, context):
"""See face_interfaces.InlineStreamInStreamOutMethod.service for spec."""
return self._method.service_stream_stream(request_iterator)
class ClientBreakdown(object):
"""An intermediate representation of invocation-side views of RPC methods.
Attributes:
request_serializers: A dictionary from RPC method name to callable
behavior to be used serializing request values for the RPC.
response_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing response values for the RPC.
"""
__metaclass__ = abc.ABCMeta
class _EasyClientBreakdown(
ClientBreakdown,
collections.namedtuple(
'_EasyClientBreakdown',
('request_serializers', 'response_deserializers'))):
pass
class ServerBreakdown(object):
"""An intermediate representation of implementations of RPC methods.
Attributes:
unary_unary_methods: A dictionary from RPC method name to callable
behavior implementing the RPC method for unary-unary RPC methods.
unary_stream_methods: A dictionary from RPC method name to callable
behavior implementing the RPC method for unary-stream RPC methods.
stream_unary_methods: A dictionary from RPC method name to callable
behavior implementing the RPC method for stream-unary RPC methods.
stream_stream_methods: A dictionary from RPC method name to callable
behavior implementing the RPC method for stream-stream RPC methods.
request_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing request values for the RPC.
response_serializers: A dictionary from RPC method name to callable
behavior to be used serializing response values for the RPC.
"""
__metaclass__ = abc.ABCMeta
class _EasyServerBreakdown(
ServerBreakdown,
collections.namedtuple(
'_EasyServerBreakdown',
('unary_unary_methods', 'unary_stream_methods', 'stream_unary_methods',
'stream_stream_methods', 'request_deserializers',
'response_serializers'))):
pass
def client_break_down(methods):
"""Derives a ClientBreakdown from several interfaces.ClientRpcMethods.
Args:
methods: A dictionary from RPC mthod name to
interfaces.ClientRpcMethod object describing the RPCs.
Returns:
A ClientBreakdown corresponding to the given methods.
"""
request_serializers = {}
response_deserializers = {}
for name, method in methods.iteritems():
request_serializers[name] = method.serialize_request
response_deserializers[name] = method.deserialize_response
return _EasyClientBreakdown(request_serializers, response_deserializers)
def server_break_down(methods):
"""Derives a ServerBreakdown from several interfaces.ServerRpcMethods.
Args:
methods: A dictionary from RPC mthod name to
interfaces.ServerRpcMethod object describing the RPCs.
Returns:
A ServerBreakdown corresponding to the given methods.
"""
unary_unary = {}
unary_stream = {}
stream_unary = {}
stream_stream = {}
request_deserializers = {}
response_serializers = {}
for name, method in methods.iteritems():
cardinality = method.cardinality()
if cardinality is interfaces.Cardinality.UNARY_UNARY:
unary_unary[name] = _InlineUnaryUnaryMethod(method)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
unary_stream[name] = _InlineUnaryStreamMethod(method)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
stream_unary[name] = _InlineStreamUnaryMethod(method)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
stream_stream[name] = _InlineStreamStreamMethod(method)
request_deserializers[name] = method.deserialize_request
response_serializers[name] = method.serialize_response
return _EasyServerBreakdown(
unary_unary, unary_stream, stream_unary, stream_stream,
request_deserializers, response_serializers)

@ -0,0 +1,207 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import abc
import collections
from grpc.framework.face import exceptions as face_exceptions
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.foundation import future
from grpc.early_adopter import exceptions
from grpc.early_adopter import interfaces
_ABORTION_REEXPORT = {
face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED,
face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED,
face_interfaces.Abortion.NETWORK_FAILURE:
interfaces.Abortion.NETWORK_FAILURE,
face_interfaces.Abortion.SERVICED_FAILURE:
interfaces.Abortion.SERVICED_FAILURE,
face_interfaces.Abortion.SERVICER_FAILURE:
interfaces.Abortion.SERVICER_FAILURE,
}
class _RpcError(exceptions.RpcError):
pass
def _reexport_error(face_rpc_error):
if isinstance(face_rpc_error, face_exceptions.CancellationError):
return exceptions.CancellationError()
elif isinstance(face_rpc_error, face_exceptions.ExpirationError):
return exceptions.ExpirationError()
else:
return _RpcError()
def _as_face_abortion_callback(abortion_callback):
def face_abortion_callback(face_abortion):
abortion_callback(_ABORTION_REEXPORT[face_abortion])
return face_abortion_callback
class _ReexportedFuture(future.Future):
def __init__(self, face_future):
self._face_future = face_future
def cancel(self):
return self._face_future.cancel()
def cancelled(self):
return self._face_future.cancelled()
def running(self):
return self._face_future.running()
def done(self):
return self._face_future.done()
def result(self, timeout=None):
try:
return self._face_future.result(timeout=timeout)
except face_exceptions.RpcError as e:
raise _reexport_error(e)
def exception(self, timeout=None):
face_error = self._face_future.exception(timeout=timeout)
return None if face_error is None else _reexport_error(face_error)
def traceback(self, timeout=None):
return self._face_future.traceback(timeout=timeout)
def add_done_callback(self, fn):
self._face_future.add_done_callback(lambda unused_face_future: fn(self))
def _call_reexporting_errors(behavior, *args, **kwargs):
try:
return behavior(*args, **kwargs)
except face_exceptions.RpcError as e:
raise _reexport_error(e)
def _reexported_future(face_future):
return _ReexportedFuture(face_future)
class _CancellableIterator(interfaces.CancellableIterator):
def __init__(self, face_cancellable_iterator):
self._face_cancellable_iterator = face_cancellable_iterator
def __iter__(self):
return self
def next(self):
return _call_reexporting_errors(self._face_cancellable_iterator.next)
def cancel(self):
self._face_cancellable_iterator.cancel()
class _RpcContext(interfaces.RpcContext):
def __init__(self, face_rpc_context):
self._face_rpc_context = face_rpc_context
def is_active(self):
return self._face_rpc_context.is_active()
def time_remaining(self):
return self._face_rpc_context.time_remaining()
def add_abortion_callback(self, abortion_callback):
self._face_rpc_context.add_abortion_callback(
_as_face_abortion_callback(abortion_callback))
class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
def __init__(self, face_unary_unary_sync_async):
self._underlying = face_unary_unary_sync_async
def __call__(self, request, timeout):
return _call_reexporting_errors(
self._underlying, request, timeout)
def async(self, request, timeout):
return _ReexportedFuture(self._underlying.async(request, timeout))
class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
def __init__(self, face_stream_unary_sync_async):
self._underlying = face_stream_unary_sync_async
def __call__(self, request_iterator, timeout):
return _call_reexporting_errors(
self._underlying, request_iterator, timeout)
def async(self, request_iterator, timeout):
return _ReexportedFuture(self._underlying.async(request_iterator, timeout))
class _Stub(interfaces.Stub):
def __init__(self, assembly_stub, cardinalities):
self._assembly_stub = assembly_stub
self._cardinalities = cardinalities
def __enter__(self):
self._assembly_stub.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._assembly_stub.__exit__(exc_type, exc_val, exc_tb)
return False
def __getattr__(self, attr):
underlying_attr = self._assembly_stub.__getattr__(attr)
cardinality = self._cardinalities.get(attr)
if cardinality is interfaces.Cardinality.UNARY_UNARY:
return _UnaryUnarySyncAsync(underlying_attr)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
return lambda request, timeout: _CancellableIterator(
underlying_attr(request, timeout))
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
return _StreamUnarySyncAsync(underlying_attr)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
return lambda request_iterator, timeout: _CancellableIterator(
underlying_attr(request_iterator, timeout))
else:
raise AttributeError(attr)
def rpc_context(face_rpc_context):
return _RpcContext(face_rpc_context)
def stub(assembly_stub, cardinalities):
return _Stub(assembly_stub, cardinalities)

@ -0,0 +1,48 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Exceptions raised by GRPC.
Only GRPC should instantiate and raise these exceptions.
"""
import abc
class RpcError(Exception):
"""Common super type for all exceptions raised by GRPC."""
__metaclass__ = abc.ABCMeta
class CancellationError(RpcError):
"""Indicates that an RPC has been cancelled."""
class ExpirationError(RpcError):
"""Indicates that an RPC has expired ("timed out")."""

@ -31,15 +31,12 @@
import threading
from grpc._adapter import fore
from grpc.framework.base.packets import implementations as _tickets_implementations
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
from grpc.early_adopter import _face_utilities
from grpc._adapter import fore as _fore
from grpc._adapter import rear as _rear
from grpc.early_adopter import _assembly_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
_MEGA_TIMEOUT = 60 * 60 * 24
_THREAD_POOL_SIZE = 80
from grpc.framework.assembly import implementations as _assembly_implementations
class _Server(interfaces.Server):
@ -48,63 +45,120 @@ class _Server(interfaces.Server):
self._lock = threading.Lock()
self._breakdown = breakdown
self._port = port
self._private_key = private_key
self._certificate_chain = certificate_chain
if private_key is None or certificate_chain is None:
self._key_chain_pairs = ()
else:
self._key_chain_pairs = ((private_key, certificate_chain),)
self._pool = None
self._fore_link = None
self._back = None
self._server = None
def start(self):
"""See interfaces.Server.start for specification."""
def _start(self):
with self._lock:
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = _face_implementations.servicer(
self._pool,
inline_value_in_value_out_methods=self._breakdown.unary_unary_methods,
inline_value_in_stream_out_methods=self._breakdown.unary_stream_methods,
inline_stream_in_value_out_methods=self._breakdown.stream_unary_methods,
inline_stream_in_stream_out_methods=self._breakdown.stream_stream_methods)
self._fore_link = fore.ForeLink(
self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None,
((self._private_key, self._certificate_chain),), port=self._port)
self._fore_link.start()
port = self._fore_link.port()
self._back = _tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT,
_MEGA_TIMEOUT)
self._fore_link.join_rear_link(self._back)
self._back.join_fore_link(self._fore_link)
return port
if self._server is None:
self._fore_link = _fore.activated_fore_link(
self._port, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None, self._key_chain_pairs)
self._server = _assembly_implementations.assemble_service(
self._breakdown.implementations, self._fore_link)
self._server.start()
else:
raise ValueError('Server currently running!')
def stop(self):
"""See interfaces.Server.stop for specification."""
def _stop(self):
with self._lock:
if self._pool is None:
if self._server is None:
raise ValueError('Server not running!')
else:
self._fore_link.stop()
self._pool.shutdown(wait=True)
self._pool = None
self._server.stop()
self._server = None
self._fore_link = None
def __enter__(self):
self._start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
self._start()
def stop(self):
self._stop()
def port(self):
with self._lock:
return self._fore_link.port()
def _build_stub(
methods, host, port, root_certificates, private_key, certificate_chain):
breakdown = _assembly_utilities.break_down_invocation(methods)
# TODO(nathaniel): pass security values.
activated_rear_link = _rear.activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers)
assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub(
breakdown.implementations, activated_rear_link)
return _reexport.stub(assembly_stub, breakdown.cardinalities)
def _build_server(methods, port, private_key, certificate_chain):
breakdown = _face_utilities.server_break_down(methods)
breakdown = _assembly_utilities.break_down_service(methods)
return _Server(breakdown, port, private_key, certificate_chain)
def insecure_stub(methods, host, port):
"""Constructs an insecure interfaces.Stub.
Args:
methods: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs to be
supported by the created stub.
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
Returns:
An interfaces.Stub affording RPC invocation.
"""
return _build_stub(methods, host, port, None, None, None)
def secure_stub(
methods, host, port, root_certificates, private_key, certificate_chain):
"""Constructs an insecure interfaces.Stub.
Args:
methods: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs to be
supported by the created stub.
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
root_certificates: The PEM-encoded root certificates or None to ask for
them to be retrieved from a default location.
private_key: The PEM-encoded private key to use or None if no private key
should be used.
certificate_chain: The PEM-encoded certificate chain to use or None if no
certificate chain should be used.
Returns:
An interfaces.Stub affording RPC invocation.
"""
return _build_stub(
methods, host, port, root_certificates, private_key, certificate_chain)
def insecure_server(methods, port):
"""Constructs an insecure interfaces.Server.
Args:
methods: A dictionary from RPC method name to
interfaces.ServerRpcMethod object describing the RPCs to
interfaces.RpcMethodServiceDescription describing the RPCs to
be serviced by the created server.
port: The port on which to serve.
port: The desired port on which to serve or zero to ask for a port to
be automatically selected.
Returns:
An interfaces.Server that will run with no security and
@ -118,9 +172,10 @@ def secure_server(methods, port, private_key, certificate_chain):
Args:
methods: A dictionary from RPC method name to
interfaces.ServerRpcMethod object describing the RPCs to
interfaces.RpcMethodServiceDescription describing the RPCs to
be serviced by the created server.
port: The port on which to serve.
port: The port on which to serve or zero to ask for a port to be
automatically selected.
private_key: A pem-encoded private key.
certificate_chain: A pem-encoded certificate chain.

@ -0,0 +1,176 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): Expand this test coverage.
"""Test of the GRPC-backed ForeLink and RearLink."""
import unittest
from grpc.early_adopter import implementations
from grpc.early_adopter import utilities
from grpc._junkdrawer import math_pb2
DIV = 'Div'
DIV_MANY = 'DivMany'
FIB = 'Fib'
SUM = 'Sum'
def _fibbonacci(limit):
left, right = 0, 1
for _ in xrange(limit):
yield left
left, right = right, left + right
def _div(request, unused_context):
return math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _div_many(request_iterator, unused_context):
for request in request_iterator:
yield math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _fib(request, unused_context):
for number in _fibbonacci(request.limit):
yield math_pb2.Num(num=number)
def _sum(request_iterator, unused_context):
accumulation = 0
for request in request_iterator:
accumulation += request.num
return math_pb2.Num(num=accumulation)
_INVOCATION_DESCRIPTIONS = {
DIV: utilities.unary_unary_invocation_description(
math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString),
DIV_MANY: utilities.stream_stream_invocation_description(
math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString),
FIB: utilities.unary_stream_invocation_description(
math_pb2.FibArgs.SerializeToString, math_pb2.Num.FromString),
SUM: utilities.stream_unary_invocation_description(
math_pb2.Num.SerializeToString, math_pb2.Num.FromString),
}
_SERVICE_DESCRIPTIONS = {
DIV: utilities.unary_unary_service_description(
_div, math_pb2.DivArgs.FromString,
math_pb2.DivReply.SerializeToString),
DIV_MANY: utilities.stream_stream_service_description(
_div_many, math_pb2.DivArgs.FromString,
math_pb2.DivReply.SerializeToString),
FIB: utilities.unary_stream_service_description(
_fib, math_pb2.FibArgs.FromString, math_pb2.Num.SerializeToString),
SUM: utilities.stream_unary_service_description(
_sum, math_pb2.Num.FromString, math_pb2.Num.SerializeToString),
}
_TIMEOUT = 3
class EarlyAdopterImplementationsTest(unittest.TestCase):
def setUp(self):
self.server = implementations.insecure_server(_SERVICE_DESCRIPTIONS, 0)
self.server.start()
port = self.server.port()
self.stub = implementations.insecure_stub(_INVOCATION_DESCRIPTIONS, 'localhost', port)
def tearDown(self):
self.server.stop()
def testUpAndDown(self):
with self.stub:
pass
def testUnaryUnary(self):
divisor = 59
dividend = 973
expected_quotient = dividend / divisor
expected_remainder = dividend % divisor
with self.stub:
response = self.stub.Div(
math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
self.assertEqual(expected_quotient, response.quotient)
self.assertEqual(expected_remainder, response.remainder)
def testUnaryStream(self):
stream_length = 43
with self.stub:
response_iterator = self.stub.Fib(
math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
numbers = tuple(response.num for response in response_iterator)
for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
self.assertEqual(early + middle, later)
self.assertEqual(stream_length, len(numbers))
def testStreamUnary(self):
stream_length = 127
with self.stub:
response_future = self.stub.Sum.async(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
(stream_length * (stream_length - 1)) / 2,
response_future.result().num)
def testStreamStream(self):
stream_length = 179
divisor_offset = 71
dividend_offset = 1763
with self.stub:
response_iterator = self.stub.DivMany(
(math_pb2.DivArgs(
divisor=divisor_offset + index,
dividend=dividend_offset + index)
for index in range(stream_length)),
_TIMEOUT)
for index, response in enumerate(response_iterator):
self.assertEqual(
(dividend_offset + index) / (divisor_offset + index),
response.quotient)
self.assertEqual(
(dividend_offset + index) % (divisor_offset + index),
response.remainder)
self.assertEqual(stream_length, index + 1)
if __name__ == '__main__':
unittest.main()

@ -32,6 +32,11 @@
import abc
import enum
# exceptions is referenced from specification in this module.
from grpc.early_adopter import exceptions # pylint: disable=unused-import
from grpc.framework.foundation import activated
from grpc.framework.foundation import future
@enum.unique
class Cardinality(enum.Enum):
@ -43,24 +48,166 @@ class Cardinality(enum.Enum):
STREAM_STREAM = 'request-streaming/response-streaming'
class RpcMethod(object):
"""A type for the common aspects of RPC method specifications."""
@enum.unique
class Abortion(enum.Enum):
"""Categories of RPC abortion."""
CANCELLED = 'cancelled'
EXPIRED = 'expired'
NETWORK_FAILURE = 'network failure'
SERVICED_FAILURE = 'serviced failure'
SERVICER_FAILURE = 'servicer failure'
class CancellableIterator(object):
"""Implements the Iterator protocol and affords a cancel method."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __iter__(self):
"""Returns the self object in accordance with the Iterator protocol."""
raise NotImplementedError()
@abc.abstractmethod
def next(self):
"""Returns a value or raises StopIteration per the Iterator protocol."""
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
"""Requests cancellation of whatever computation underlies this iterator."""
raise NotImplementedError()
class RpcContext(object):
"""Provides RPC-related information and action."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def is_active(self):
"""Describes whether the RPC is active or has terminated."""
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
Returns:
A nonnegative float indicating the length of allowed time in seconds
remaining for the RPC to complete before it is considered to have timed
out.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_abortion_callback(self, abortion_callback):
"""Registers a callback to be called if the RPC is aborted.
Args:
abortion_callback: A callable to be called and passed an Abortion value
in the event of RPC abortion.
"""
raise NotImplementedError()
class UnaryUnarySyncAsync(object):
"""Affords invoking a unary-unary RPC synchronously or asynchronously.
Values implementing this interface are directly callable and present an
"async" method. Both calls take a request value and a numeric timeout.
Direct invocation of a value of this type invokes its associated RPC and
blocks until the RPC's response is available. Calling the "async" method
of a value of this type invokes its associated RPC and immediately returns a
future.Future bound to the asynchronous execution of the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def async(self, request, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future's result value will be the response value of the RPC.
In the event of RPC abortion, the returned Future's exception value
will be an exceptions.RpcError.
"""
raise NotImplementedError()
class StreamUnarySyncAsync(object):
"""Affords invoking a stream-unary RPC synchronously or asynchronously.
Values implementing this interface are directly callable and present an
"async" method. Both calls take an iterator of request values and a numeric
timeout. Direct invocation of a value of this type invokes its associated RPC
and blocks until the RPC's response is available. Calling the "async" method
of a value of this type invokes its associated RPC and immediately returns a
future.Future bound to the asynchronous execution of the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __call__(self, request_iterator, timeout):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def async(self, request_iterator, timeout):
"""Asynchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future's result value will be the response value of the RPC.
In the event of RPC abortion, the returned Future's exception value
will be an exceptions.RpcError.
"""
raise NotImplementedError()
class RpcMethodDescription(object):
"""A type for the common aspects of RPC method descriptions."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cardinality(self):
"""Identifies the cardinality of this RpcMethod.
"""Identifies the cardinality of this RpcMethodDescription.
Returns:
A Cardinality value identifying whether or not this
RpcMethod is request-unary or request-streaming and
whether or not it is response-unary or
response-streaming.
RpcMethodDescription is request-unary or request-streaming and
whether or not it is response-unary or response-streaming.
"""
raise NotImplementedError()
class ClientRpcMethod(RpcMethod):
class RpcMethodInvocationDescription(RpcMethodDescription):
"""Invocation-side description of an RPC method."""
__metaclass__ = abc.ABCMeta
@ -69,7 +216,8 @@ class ClientRpcMethod(RpcMethod):
"""Serializes a request value.
Args:
request: A request value appropriate for this RpcMethod.
request: A request value appropriate for the RPC method described by this
RpcMethodInvocationDescription.
Returns:
The serialization of the given request value as a
@ -82,9 +230,9 @@ class ClientRpcMethod(RpcMethod):
"""Deserializes a response value.
Args:
serialized_response: A bytestring that is the
serialization of a response value appropriate for this
RpcMethod.
serialized_response: A bytestring that is the serialization of a response
value appropriate for the RPC method described by this
RpcMethodInvocationDescription.
Returns:
A response value corresponding to the given bytestring.
@ -92,7 +240,7 @@ class ClientRpcMethod(RpcMethod):
raise NotImplementedError()
class ServerRpcMethod(RpcMethod):
class RpcMethodServiceDescription(RpcMethodDescription):
"""Service-side description of an RPC method."""
__metaclass__ = abc.ABCMeta
@ -101,9 +249,9 @@ class ServerRpcMethod(RpcMethod):
"""Deserializes a request value.
Args:
serialized_request: A bytestring that is the
serialization of a request value appropriate for this
RpcMethod.
serialized_request: A bytestring that is the serialization of a request
value appropriate for the RPC method described by this
RpcMethodServiceDescription.
Returns:
A request value corresponding to the given bytestring.
@ -115,7 +263,8 @@ class ServerRpcMethod(RpcMethod):
"""Serializes a response value.
Args:
response: A response value appropriate for this RpcMethod.
response: A response value appropriate for the RPC method described by
this RpcMethodServiceDescription.
Returns:
The serialization of the given response value as a
@ -124,80 +273,116 @@ class ServerRpcMethod(RpcMethod):
raise NotImplementedError()
@abc.abstractmethod
def service_unary_unary(self, request):
def service_unary_unary(self, request, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.UNARY_UNARY.
RpcMethodServiceDescription is Cardinality.UNARY_UNARY.
Args:
request: A request value appropriate for this RpcMethod.
request: A request value appropriate for the RPC method described by this
RpcMethodServiceDescription.
context: An RpcContext object for the RPC.
Returns:
A response value appropriate for this RpcMethod.
A response value appropriate for the RPC method described by this
RpcMethodServiceDescription.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_unary_stream(self, request):
def service_unary_stream(self, request, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.UNARY_STREAM.
RpcMethodServiceDescription is Cardinality.UNARY_STREAM.
Args:
request: A request value appropriate for this RpcMethod.
request: A request value appropriate for the RPC method described by this
RpcMethodServiceDescription.
context: An RpcContext object for the RPC.
Yields:
Zero or more response values appropriate for this
RpcMethod.
Zero or more response values appropriate for the RPC method described by
this RpcMethodServiceDescription.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_stream_unary(self, request_iterator):
def service_stream_unary(self, request_iterator, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.STREAM_UNARY.
RpcMethodServiceDescription is Cardinality.STREAM_UNARY.
Args:
request_iterator: An iterator of request values
appropriate for this RpcMethod.
request_iterator: An iterator of request values appropriate for the RPC
method described by this RpcMethodServiceDescription.
context: An RpcContext object for the RPC.
Returns:
A response value appropriate for this RpcMethod.
A response value appropriate for the RPC method described by this
RpcMethodServiceDescription.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_stream_stream(self, request_iterator):
def service_stream_stream(self, request_iterator, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.STREAM_STREAM.
RpcMethodServiceDescription is Cardinality.STREAM_STREAM.
Args:
request_iterator: An iterator of request values
appropriate for this RpcMethod.
request_iterator: An iterator of request values appropriate for the RPC
method described by this RpcMethodServiceDescription.
context: An RpcContext object for the RPC.
Yields:
Zero or more response values appropraite for this
RpcMethod.
Zero or more response values appropriate for the RPC method described by
this RpcMethodServiceDescription.
"""
raise NotImplementedError()
class Server(object):
class Stub(object):
"""A stub with callable RPC method names for attributes.
Instances of this type are context managers and only afford RPC invocation
when used in context.
Instances of this type, when used in context, respond to attribute access
as follows: if the requested attribute is the name of a unary-unary RPC
method, the value of the attribute will be a UnaryUnarySyncAsync with which
to invoke the RPC method. If the requested attribute is the name of a
unary-stream RPC method, the value of the attribute will be a callable taking
a request object and a timeout parameter and returning a CancellableIterator
that yields the response values of the RPC. If the requested attribute is the
name of a stream-unary RPC method, the value of the attribute will be a
StreamUnarySyncAsync with which to invoke the RPC method. If the requested
attribute is the name of a stream-stream RPC method, the value of the
attribute will be a callable taking an iterator of request objects and a
timeout and returning a CancellableIterator that yields the response values
of the RPC.
In all cases indication of abortion is indicated by raising of
exceptions.RpcError, exceptions.CancellationError,
and exceptions.ExpirationError.
"""
__metaclass__ = abc.ABCMeta
class Server(activated.Activated):
"""A GRPC Server."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
"""Instructs this server to commence service of RPCs."""
raise NotImplementedError()
def port(self):
"""Reports the port on which the server is serving.
@abc.abstractmethod
def stop(self):
"""Instructs this server to halt service of RPCs."""
This method may only be called while the server is activated.
Returns:
The port on which the server is serving.
"""
raise NotImplementedError()

@ -32,7 +32,9 @@
from grpc.early_adopter import interfaces
class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod):
class _RpcMethodDescription(
interfaces.RpcMethodInvocationDescription,
interfaces.RpcMethodServiceDescription):
def __init__(
self, cardinality, unary_unary, unary_stream, stream_unary,
@ -49,44 +51,45 @@ class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod):
self._response_deserializer = response_deserializer
def cardinality(self):
"""See interfaces.RpcMethod.cardinality for specification."""
"""See interfaces.RpcMethodDescription.cardinality for specification."""
return self._cardinality
def serialize_request(self, request):
"""See interfaces.RpcMethod.serialize_request for specification."""
"""See interfaces.RpcMethodInvocationDescription.serialize_request."""
return self._request_serializer(request)
def deserialize_request(self, serialized_request):
"""See interfaces.RpcMethod.deserialize_request for specification."""
"""See interfaces.RpcMethodServiceDescription.deserialize_request."""
return self._request_deserializer(serialized_request)
def serialize_response(self, response):
"""See interfaces.RpcMethod.serialize_response for specification."""
"""See interfaces.RpcMethodServiceDescription.serialize_response."""
return self._response_serializer(response)
def deserialize_response(self, serialized_response):
"""See interfaces.RpcMethod.deserialize_response for specification."""
"""See interfaces.RpcMethodInvocationDescription.deserialize_response."""
return self._response_deserializer(serialized_response)
def service_unary_unary(self, request):
"""See interfaces.RpcMethod.service_unary_unary for specification."""
return self._unary_unary(request)
def service_unary_unary(self, request, context):
"""See interfaces.RpcMethodServiceDescription.service_unary_unary."""
return self._unary_unary(request, context)
def service_unary_stream(self, request):
"""See interfaces.RpcMethod.service_unary_stream for specification."""
return self._unary_stream(request)
def service_unary_stream(self, request, context):
"""See interfaces.RpcMethodServiceDescription.service_unary_stream."""
return self._unary_stream(request, context)
def service_stream_unary(self, request_iterator):
"""See interfaces.RpcMethod.service_stream_unary for specification."""
return self._stream_unary(request_iterator)
def service_stream_unary(self, request_iterator, context):
"""See interfaces.RpcMethodServiceDescription.service_stream_unary."""
return self._stream_unary(request_iterator, context)
def service_stream_stream(self, request_iterator):
"""See interfaces.RpcMethod.service_stream_stream for specification."""
return self._stream_stream(request_iterator)
def service_stream_stream(self, request_iterator, context):
"""See interfaces.RpcMethodServiceDescription.service_stream_stream."""
return self._stream_stream(request_iterator, context)
def unary_unary_client_rpc_method(request_serializer, response_deserializer):
"""Constructs an interfaces.ClientRpcMethod for a unary-unary RPC method.
def unary_unary_invocation_description(
request_serializer, response_deserializer):
"""Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@ -96,17 +99,17 @@ def unary_unary_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
An interfaces.ClientRpcMethod constructed from the given
arguments representing a unary-request/unary-response RPC
method.
An interfaces.RpcMethodInvocationDescription constructed from the given
arguments representing a unary-request/unary-response RPC method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.UNARY_UNARY, None, None, None, None,
request_serializer, None, None, response_deserializer)
def unary_stream_client_rpc_method(request_serializer, response_deserializer):
"""Constructs an interfaces.ClientRpcMethod for a unary-stream RPC method.
def unary_stream_invocation_description(
request_serializer, response_deserializer):
"""Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@ -116,17 +119,17 @@ def unary_stream_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
An interfaces.ClientRpcMethod constructed from the given
arguments representing a unary-request/streaming-response
RPC method.
An interfaces.RpcMethodInvocationDescription constructed from the given
arguments representing a unary-request/streaming-response RPC method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.UNARY_STREAM, None, None, None, None,
request_serializer, None, None, response_deserializer)
def stream_unary_client_rpc_method(request_serializer, response_deserializer):
"""Constructs an interfaces.ClientRpcMethod for a stream-unary RPC method.
def stream_unary_invocation_description(
request_serializer, response_deserializer):
"""Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@ -136,17 +139,17 @@ def stream_unary_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
An interfaces.ClientRpcMethod constructed from the given
arguments representing a streaming-request/unary-response
RPC method.
An interfaces.RpcMethodInvocationDescription constructed from the given
arguments representing a streaming-request/unary-response RPC method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.STREAM_UNARY, None, None, None, None,
request_serializer, None, None, response_deserializer)
def stream_stream_client_rpc_method(request_serializer, response_deserializer):
"""Constructs an interfaces.ClientRpcMethod for a stream-stream RPC method.
def stream_stream_invocation_description(
request_serializer, response_deserializer):
"""Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@ -156,23 +159,23 @@ def stream_stream_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
An interfaces.ClientRpcMethod constructed from the given
arguments representing a
streaming-request/streaming-response RPC method.
An interfaces.RpcMethodInvocationDescription constructed from the given
arguments representing a streaming-request/streaming-response RPC
method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.STREAM_STREAM, None, None, None, None,
request_serializer, None, None, response_deserializer)
def unary_unary_server_rpc_method(
def unary_unary_service_description(
behavior, request_deserializer, response_serializer):
"""Constructs an interfaces.ServerRpcMethod for the given behavior.
"""Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a unary-unary RPC
method that accepts a single request and returns a single
response.
method that accepts a single request and an interfaces.RpcContext and
returns a single response.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@ -181,23 +184,23 @@ def unary_unary_server_rpc_method(
that value.
Returns:
An interfaces.ServerRpcMethod constructed from the given
An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a unary-request/unary-response RPC
method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.UNARY_UNARY, behavior, None, None, None,
None, request_deserializer, response_serializer, None)
def unary_stream_server_rpc_method(
def unary_stream_service_description(
behavior, request_deserializer, response_serializer):
"""Constructs an interfaces.ServerRpcMethod for the given behavior.
"""Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a unary-stream RPC
method that accepts a single request and returns an
iterator of zero or more responses.
method that accepts a single request and an interfaces.RpcContext
and returns an iterator of zero or more responses.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@ -206,23 +209,23 @@ def unary_stream_server_rpc_method(
that value.
Returns:
An interfaces.ServerRpcMethod constructed from the given
An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a unary-request/streaming-response
RPC method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.UNARY_STREAM, None, behavior, None, None,
None, request_deserializer, response_serializer, None)
def stream_unary_server_rpc_method(
def stream_unary_service_description(
behavior, request_deserializer, response_serializer):
"""Constructs an interfaces.ServerRpcMethod for the given behavior.
"""Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a stream-unary RPC
method that accepts an iterator of zero or more requests
and returns a single response.
and an interfaces.RpcContext and returns a single response.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@ -231,23 +234,24 @@ def stream_unary_server_rpc_method(
that value.
Returns:
An interfaces.ServerRpcMethod constructed from the given
An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a streaming-request/unary-response
RPC method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.STREAM_UNARY, None, None, behavior, None,
None, request_deserializer, response_serializer, None)
def stream_stream_server_rpc_method(
def stream_stream_service_description(
behavior, request_deserializer, response_serializer):
"""Constructs an interfaces.ServerRpcMethod for the given behavior.
"""Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a stream-stream RPC
method that accepts an iterator of zero or more requests
and returns an iterator of zero or more responses.
and an interfaces.RpcContext and returns an iterator of
zero or more responses.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@ -256,10 +260,10 @@ def stream_stream_server_rpc_method(
that value.
Returns:
An interfaces.ServerRpcMethod constructed from the given
An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a
streaming-request/streaming-response RPC method.
"""
return _RpcMethod(
return _RpcMethodDescription(
interfaces.Cardinality.STREAM_STREAM, None, None, None, behavior,
None, request_deserializer, response_serializer, None)

@ -45,6 +45,7 @@ python2.7 -B -m grpc._adapter._future_invocation_asynchronous_event_service_test
python2.7 -B -m grpc._adapter._links_test
python2.7 -B -m grpc._adapter._lonely_rear_link_test
python2.7 -B -m grpc._adapter._low_test
python2.7 -B -m grpc.early_adopter.implementations_test
python2.7 -B -m grpc.framework.assembly.implementations_test
python2.7 -B -m grpc.framework.base.packets.implementations_test
python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test

Loading…
Cancel
Save