Eliminate the Python "assembly" package

This completes issue #726. All that cascading activation stuff just
didn't work out as cleanly as I had hoped it would.
pull/993/head
Nathaniel Manista 10 years ago
parent f9007045f2
commit 31e65beaed
  1. 87
      src/python/src/grpc/_adapter/fore.py
  2. 124
      src/python/src/grpc/_adapter/rear.py
  3. 51
      src/python/src/grpc/early_adopter/_reexport.py
  4. 152
      src/python/src/grpc/early_adopter/implementations.py
  5. 30
      src/python/src/grpc/framework/assembly/__init__.py
  6. 264
      src/python/src/grpc/framework/assembly/implementations.py
  7. 288
      src/python/src/grpc/framework/assembly/implementations_test.py
  8. 58
      src/python/src/grpc/framework/assembly/interfaces.py
  9. 12
      src/python/src/grpc/framework/face/implementations.py
  10. 1
      src/python/src/setup.py
  11. 3
      tools/run_tests/python_tests.json

@ -357,90 +357,3 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)
class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated):
def __init__(
self, port, request_deserializers, response_serializers,
root_certificates, key_chain_pairs):
self._port = port
self._request_deserializers = request_deserializers
self._response_serializers = response_serializers
self._root_certificates = root_certificates
self._key_chain_pairs = key_chain_pairs
self._lock = threading.Lock()
self._pool = None
self._fore_link = None
self._rear_link = null.NULL_REAR_LINK
def join_rear_link(self, rear_link):
with self._lock:
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
if self._fore_link is not None:
self._fore_link.join_rear_link(rear_link)
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._fore_link = ForeLink(
self._pool, self._request_deserializers, self._response_serializers,
self._root_certificates, self._key_chain_pairs, port=self._port)
self._fore_link.join_rear_link(self._rear_link)
self._fore_link.start()
return self
def _stop(self):
with self._lock:
self._fore_link.stop()
self._fore_link = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def port(self):
with self._lock:
return None if self._fore_link is None else self._fore_link.port()
def accept_back_to_front_ticket(self, ticket):
with self._lock:
if self._fore_link is not None:
self._fore_link.accept_back_to_front_ticket(ticket)
def activated_fore_link(
port, request_deserializers, response_serializers, root_certificates,
key_chain_pairs):
"""Creates a ForeLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
port: The port on which to serve RPCs, or None for a port to be
automatically selected.
request_deserializers: A dictionary from RPC method names to request object
deserializer behaviors.
response_serializers: A dictionary from RPC method names to response object
serializer behaviors.
root_certificates: The PEM-encoded client root certificates as a bytestring
or None.
key_chain_pairs: A sequence of PEM-encoded private key-certificate chain
pairs.
"""
return _ActivatedForeLink(
port, request_deserializers, response_serializers, root_certificates,
key_chain_pairs)

@ -387,127 +387,3 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated):
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
self._cancel(ticket.operation_id)
class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated):
def __init__(
self, host, port, request_serializers, response_deserializers, secure,
root_certificates, private_key, certificate_chain,
server_host_override=None):
self._host = host
self._port = port
self._request_serializers = request_serializers
self._response_deserializers = response_deserializers
self._secure = secure
self._root_certificates = root_certificates
self._private_key = private_key
self._certificate_chain = certificate_chain
self._server_host_override = server_host_override
self._lock = threading.Lock()
self._pool = None
self._rear_link = None
self._fore_link = null.NULL_FORE_LINK
def join_fore_link(self, fore_link):
with self._lock:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
if self._rear_link is not None:
self._rear_link.join_fore_link(self._fore_link)
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._rear_link = RearLink(
self._host, self._port, self._pool, self._request_serializers,
self._response_deserializers, self._secure, self._root_certificates,
self._private_key, self._certificate_chain,
server_host_override=self._server_host_override)
self._rear_link.join_fore_link(self._fore_link)
self._rear_link.start()
return self
def _stop(self):
with self._lock:
self._rear_link.stop()
self._rear_link = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def accept_front_to_back_ticket(self, ticket):
with self._lock:
if self._rear_link is not None:
self._rear_link.accept_front_to_back_ticket(ticket)
# TODO(issue 726): reconcile these two creation functions.
def activated_rear_link(
host, port, request_serializers, response_deserializers):
"""Creates a RearLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
request_serializers: A dictionary from RPC method name to request object
serializer behavior.
response_deserializers: A dictionary from RPC method name to response
object deserializer behavior.
secure: A boolean indicating whether or not to use a secure connection.
root_certificates: The PEM-encoded root certificates or None to ask for
them to be retrieved from a default location.
private_key: The PEM-encoded private key to use or None if no private key
should be used.
certificate_chain: The PEM-encoded certificate chain to use or None if no
certificate chain should be used.
"""
return _ActivatedRearLink(
host, port, request_serializers, response_deserializers, False, None,
None, None)
def secure_activated_rear_link(
host, port, request_serializers, response_deserializers, root_certificates,
private_key, certificate_chain, server_host_override=None):
"""Creates a RearLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
request_serializers: A dictionary from RPC method name to request object
serializer behavior.
response_deserializers: A dictionary from RPC method name to response
object deserializer behavior.
root_certificates: The PEM-encoded root certificates or None to ask for
them to be retrieved from a default location.
private_key: The PEM-encoded private key to use or None if no private key
should be used.
certificate_chain: The PEM-encoded certificate chain to use or None if no
certificate chain should be used.
server_host_override: (For testing only) the target name used for SSL
host name checking.
"""
return _ActivatedRearLink(
host, port, request_serializers, response_deserializers, True,
root_certificates, private_key, certificate_chain,
server_host_override=server_host_override)

@ -174,45 +174,6 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
return _ReexportedFuture(self._underlying.future(request_iterator, timeout))
class _Stub(interfaces.Stub):
def __init__(self, assembly_stub, cardinalities):
self._assembly_stub = assembly_stub
self._cardinalities = cardinalities
def __enter__(self):
self._assembly_stub.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._assembly_stub.__exit__(exc_type, exc_val, exc_tb)
return False
def __getattr__(self, attr):
underlying_attr = self._assembly_stub.__getattr__(attr)
method_cardinality = self._cardinalities.get(attr)
# TODO(nathaniel): unify this trick with its other occurrence in the code.
if method_cardinality is None:
for name, method_cardinality in self._cardinalities.iteritems():
last_slash_index = name.rfind('/')
if 0 <= last_slash_index and name[last_slash_index + 1:] == attr:
break
else:
raise AttributeError(attr)
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
return _UnaryUnarySyncAsync(underlying_attr)
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
return lambda request, timeout: _CancellableIterator(
underlying_attr(request, timeout))
elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
return _StreamUnarySyncAsync(underlying_attr)
elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
return lambda request_iterator, timeout: _CancellableIterator(
underlying_attr(request_iterator, timeout))
else:
raise AttributeError(attr)
def common_cardinalities(early_adopter_cardinalities):
common_cardinalities = {}
for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems():
@ -225,5 +186,13 @@ def rpc_context(face_rpc_context):
return _RpcContext(face_rpc_context)
def stub(face_stub, cardinalities):
return _Stub(face_stub, cardinalities)
def cancellable_iterator(face_cancellable_iterator):
return _CancellableIterator(face_cancellable_iterator)
def unary_unary_sync_async(face_unary_unary_multi_callable):
return _UnaryUnarySyncAsync(face_unary_unary_multi_callable)
def stream_unary_sync_async(face_stream_unary_multi_callable):
return _StreamUnarySyncAsync(face_stream_unary_multi_callable)

@ -36,7 +36,13 @@ from grpc._adapter import rear as _rear
from grpc.early_adopter import _face_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
from grpc.framework.assembly import implementations as _assembly_implementations
from grpc.framework.base import util as _base_utilities
from grpc.framework.base.packets import implementations as _tickets_implementations
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
_THREAD_POOL_SIZE = 80
_ONE_DAY_IN_SECONDS = 24 * 60 * 60
class _Server(interfaces.Server):
@ -50,30 +56,39 @@ class _Server(interfaces.Server):
else:
self._key_chain_pairs = ((private_key, certificate_chain),)
self._pool = None
self._back = None
self._fore_link = None
self._server = None
def _start(self):
with self._lock:
if self._server is None:
self._fore_link = _fore.activated_fore_link(
self._port, self._breakdown.request_deserializers,
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = _face_implementations.servicer(
self._pool, self._breakdown.implementations, None)
self._back = _tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
_ONE_DAY_IN_SECONDS)
self._fore_link = _fore.ForeLink(
self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None, self._key_chain_pairs)
self._server = _assembly_implementations.assemble_service(
self._breakdown.implementations, self._fore_link)
self._server.start()
self._back.join_fore_link(self._fore_link)
self._fore_link.join_rear_link(self._back)
self._fore_link.start()
else:
raise ValueError('Server currently running!')
def _stop(self):
with self._lock:
if self._server is None:
if self._pool is None:
raise ValueError('Server not running!')
else:
self._server.stop()
self._server = None
self._fore_link.stop()
_base_utilities.wait_for_idle(self._back)
self._pool.shutdown(wait=True)
self._fore_link = None
self._back = None
self._pool = None
def __enter__(self):
self._start()
@ -93,11 +108,101 @@ class _Server(interfaces.Server):
with self._lock:
return self._fore_link.port()
def _build_stub(breakdown, activated_rear_link):
assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub(
_reexport.common_cardinalities(breakdown.cardinalities),
activated_rear_link)
return _reexport.stub(assembly_stub, breakdown.cardinalities)
class _Stub(interfaces.Stub):
def __init__(
self, breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=None):
self._lock = threading.Lock()
self._breakdown = breakdown
self._host = host
self._port = port
self._secure = secure
self._root_certificates = root_certificates
self._private_key = private_key
self._certificate_chain = certificate_chain
self._server_host_override = server_host_override
self._pool = None
self._front = None
self._rear_link = None
self._understub = None
def __enter__(self):
with self._lock:
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._front = _tickets_implementations.front(
self._pool, self._pool, self._pool)
self._rear_link = _rear.RearLink(
self._host, self._port, self._pool,
self._breakdown.request_serializers,
self._breakdown.response_deserializers, self._secure,
self._root_certificates, self._private_key, self._certificate_chain,
server_host_override=self._server_host_override)
self._front.join_rear_link(self._rear_link)
self._rear_link.join_fore_link(self._front)
self._rear_link.start()
self._understub = _face_implementations.dynamic_stub(
_reexport.common_cardinalities(self._breakdown.cardinalities),
self._front, self._pool, '')
else:
raise ValueError('Tried to __enter__ already-__enter__ed Stub!')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
if self._pool is None:
raise ValueError('Tried to __exit__ non-__enter__ed Stub!')
else:
self._rear_link.stop()
_base_utilities.wait_for_idle(self._front)
self._pool.shutdown(wait=True)
self._rear_link = None
self._front = None
self._pool = None
self._understub = None
return False
def __getattr__(self, attr):
with self._lock:
if self._pool is None:
raise ValueError('Tried to __getattr__ non-__enter__ed Stub!')
else:
underlying_attr = getattr(self._understub, attr, None)
method_cardinality = self._breakdown.cardinalities.get(attr)
# TODO(nathaniel): Eliminate this trick.
if underlying_attr is None:
for method_name, method_cardinality in self._breakdown.cardinalities.iteritems():
last_slash_index = method_name.rfind('/')
if 0 <= last_slash_index and method_name[last_slash_index + 1:] == attr:
underlying_attr = getattr(self._understub, method_name)
break
else:
raise AttributeError(attr)
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
return _reexport.unary_unary_sync_async(underlying_attr)
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
return lambda request, timeout: _reexport.cancellable_iterator(
underlying_attr(request, timeout))
elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
return _reexport.stream_unary_sync_async(underlying_attr)
elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
return lambda request_iterator, timeout: (
_reexport.cancellable_iterator(underlying_attr(
request_iterator, timeout)))
else:
raise AttributeError(attr)
def _build_stub(
methods, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=None):
breakdown = _face_utilities.break_down_invocation(methods)
return _Stub(
breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
def _build_server(methods, port, private_key, certificate_chain):
@ -118,11 +223,7 @@ def insecure_stub(methods, host, port):
Returns:
An interfaces.Stub affording RPC invocation.
"""
breakdown = _face_utilities.break_down_invocation(methods)
activated_rear_link = _rear.activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers)
return _build_stub(breakdown, activated_rear_link)
return _build_stub(methods, host, port, False, None, None, None)
def secure_stub(
@ -148,12 +249,9 @@ def secure_stub(
Returns:
An interfaces.Stub affording RPC invocation.
"""
breakdown = _face_utilities.break_down_invocation(methods)
activated_rear_link = _rear.secure_activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers, root_certificates, private_key,
return _build_stub(
methods, host, port, True, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
return _build_stub(breakdown, activated_rear_link)
def insecure_server(methods, port):

@ -1,30 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -1,264 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Implementations for assembling RPC framework values."""
import threading
# tickets_interfaces, face_interfaces, and activated are referenced from
# specification in this module.
from grpc.framework.assembly import interfaces
from grpc.framework.base import util as base_utilities
from grpc.framework.base.packets import implementations as tickets_implementations
from grpc.framework.base.packets import interfaces as tickets_interfaces # pylint: disable=unused-import
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.face import implementations as face_implementations
from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
from grpc.framework.face import utilities as face_utilities
from grpc.framework.foundation import activated # pylint: disable=unused-import
from grpc.framework.foundation import logging_pool
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_THREAD_POOL_SIZE = 100
class _FaceStub(object):
def __init__(self, rear_link):
self._rear_link = rear_link
self._lock = threading.Lock()
self._pool = None
self._front = None
self._under_stub = None
def __enter__(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._front = tickets_implementations.front(
self._pool, self._pool, self._pool)
self._rear_link.start()
self._rear_link.join_fore_link(self._front)
self._front.join_rear_link(self._rear_link)
self._under_stub = face_implementations.generic_stub(self._front, self._pool)
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
self._under_stub = None
self._rear_link.stop()
base_utilities.wait_for_idle(self._front)
self._front = None
self._pool.shutdown(wait=True)
self._pool = None
return False
def __getattr__(self, attr):
with self._lock:
if self._under_stub is None:
raise ValueError('Called out of context!')
else:
return getattr(self._under_stub, attr)
def _behaviors(method_cardinalities, front, pool):
behaviors = {}
stub = face_implementations.generic_stub(front, pool)
for name, method_cardinality in method_cardinalities.iteritems():
if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
behaviors[name] = stub.unary_unary_multi_callable(name)
elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
behaviors[name] = lambda request, context, bound_name=name: (
stub.inline_value_in_stream_out(bound_name, request, context))
elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
behaviors[name] = stub.stream_unary_multi_callable(name)
elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
behaviors[name] = lambda request_iterator, context, bound_name=name: (
stub.inline_stream_in_stream_out(
bound_name, request_iterator, context))
return behaviors
class _DynamicInlineStub(object):
def __init__(self, cardinalities, rear_link):
self._cardinalities = cardinalities
self._rear_link = rear_link
self._lock = threading.Lock()
self._pool = None
self._front = None
self._behaviors = None
def __enter__(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._front = tickets_implementations.front(
self._pool, self._pool, self._pool)
self._rear_link.start()
self._rear_link.join_fore_link(self._front)
self._front.join_rear_link(self._rear_link)
self._behaviors = _behaviors(
self._cardinalities, self._front, self._pool)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
self._behaviors = None
self._rear_link.stop()
base_utilities.wait_for_idle(self._front)
self._front = None
self._pool.shutdown(wait=True)
self._pool = None
return False
def __getattr__(self, attr):
with self._lock:
behavior = self._behaviors.get(attr)
if behavior is None:
for name, behavior in self._behaviors.iteritems():
last_slash_index = name.rfind('/')
if 0 <= last_slash_index and name[last_slash_index + 1:] == attr:
return behavior
else:
raise AttributeError(
'_DynamicInlineStub instance has no attribute "%s"!' % attr)
else:
return behavior
class _ServiceAssembly(interfaces.Server):
def __init__(self, implementations, fore_link):
self._implementations = implementations
self._fore_link = fore_link
self._lock = threading.Lock()
self._pool = None
self._back = None
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = face_implementations.servicer(
self._pool, self._implementations, None)
self._back = tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
_ONE_DAY_IN_SECONDS)
self._fore_link.start()
self._fore_link.join_rear_link(self._back)
self._back.join_fore_link(self._fore_link)
def _stop(self):
with self._lock:
self._fore_link.stop()
base_utilities.wait_for_idle(self._back)
self._back = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
self._start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def port(self):
with self._lock:
return self._fore_link.port()
def assemble_face_stub(activated_rear_link):
"""Assembles a face_interfaces.GenericStub.
The returned object is a context manager and may only be used in context to
invoke RPCs.
Args:
activated_rear_link: An object that is both a tickets_interfaces.RearLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
A face_interfaces.GenericStub on which, in context, RPCs can be invoked.
"""
return _FaceStub(activated_rear_link)
def assemble_dynamic_inline_stub(cardinalities, activated_rear_link):
"""Assembles a stub with method names for attributes.
The returned object is a context manager and may only be used in context to
invoke RPCs.
The returned object, when used in context, will respond to attribute access
as follows: if the requested attribute is the name of a unary-unary RPC
method, the value of the attribute will be a
face_interfaces.UnaryUnaryMultiCallable with which to invoke the RPC method.
If the requested attribute is the name of a unary-stream RPC method, the
value of the attribute will be a face_interfaces.UnaryStreamMultiCallable
with which to invoke the RPC method. If the requested attribute is the name
of a stream-unary RPC method, the value of the attribute will be a
face_interfaces.StreamUnaryMultiCallable with which to invoke the RPC method.
If the requested attribute is the name of a stream-stream RPC method, the
value of the attribute will be a face_interfaces.StreamStreamMultiCallable
with which to invoke the RPC method.
Args:
cardinalities: A dictionary from RPC method name to cardinality.Cardinality
value identifying the cardinality of the named RPC method.
activated_rear_link: An object that is both a tickets_interfaces.RearLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
A face_interfaces.DynamicStub on which, in context, RPCs can be invoked.
"""
return _DynamicInlineStub(cardinalities, activated_rear_link)
def assemble_service(implementations, activated_fore_link):
"""Assembles the service-side of the RPC Framework stack.
Args:
implementations: A dictionary from RPC method name to
face_interfaces.MethodImplementation.
activated_fore_link: An object that is both a tickets_interfaces.ForeLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
An interfaces.Server encapsulating RPC service.
"""
return _ServiceAssembly(implementations, activated_fore_link)

@ -1,288 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): Expand this test coverage.
"""Test of the GRPC-backed ForeLink and RearLink."""
import threading
import unittest
from grpc.framework.assembly import implementations
from grpc.framework.base import interfaces
from grpc.framework.base.packets import packets as tickets
from grpc.framework.base.packets import interfaces as tickets_interfaces
from grpc.framework.base.packets import null
from grpc.framework.face import utilities as face_utilities
from grpc.framework.foundation import logging_pool
from grpc._junkdrawer import math_pb2
DIV = 'Div'
DIV_MANY = 'DivMany'
FIB = 'Fib'
SUM = 'Sum'
def _fibbonacci(limit):
left, right = 0, 1
for _ in xrange(limit):
yield left
left, right = right, left + right
def _div(request, unused_context):
return math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _div_many(request_iterator, unused_context):
for request in request_iterator:
yield math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _fib(request, unused_context):
for number in _fibbonacci(request.limit):
yield math_pb2.Num(num=number)
def _sum(request_iterator, unused_context):
accumulation = 0
for request in request_iterator:
accumulation += request.num
return math_pb2.Num(num=accumulation)
_IMPLEMENTATIONS = {
DIV: face_utilities.unary_unary_inline(_div),
DIV_MANY: face_utilities.stream_stream_inline(_div_many),
FIB: face_utilities.unary_stream_inline(_fib),
SUM: face_utilities.stream_unary_inline(_sum),
}
_CARDINALITIES = {
name: implementation.cardinality
for name, implementation in _IMPLEMENTATIONS.iteritems()}
_TIMEOUT = 10
class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink):
def __init__(self):
self._fore_lock = threading.Lock()
self._fore_link = null.NULL_FORE_LINK
self._rear_lock = threading.Lock()
self._rear_link = null.NULL_REAR_LINK
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
def start(self):
pass
def stop(self):
pass
def accept_back_to_front_ticket(self, ticket):
with self._fore_lock:
self._fore_link.accept_back_to_front_ticket(ticket)
def join_rear_link(self, rear_link):
with self._rear_lock:
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
def accept_front_to_back_ticket(self, ticket):
with self._rear_lock:
self._rear_link.accept_front_to_back_ticket(ticket)
def join_fore_link(self, fore_link):
with self._fore_lock:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
class FaceStubTest(unittest.TestCase):
def testUnaryUnary(self):
divisor = 7
dividend = 13
expected_quotient = 1
expected_remainder = 6
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
service.start()
try:
with face_stub:
response = face_stub.blocking_value_in_value_out(
DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend),
_TIMEOUT)
self.assertEqual(expected_quotient, response.quotient)
self.assertEqual(expected_remainder, response.remainder)
finally:
service.stop()
def testUnaryStream(self):
stream_length = 29
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
responses = list(
face_stub.inline_value_in_stream_out(
FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT))
numbers = [response.num for response in responses]
for early, middle, later in zip(numbers, numbers[1:], numbers[2:]):
self.assertEqual(early + middle, later)
def testStreamUnary(self):
stream_length = 13
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
multi_callable = face_stub.stream_unary_multi_callable(SUM)
response_future = multi_callable.future(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
(stream_length * (stream_length - 1)) / 2,
response_future.result().num)
def testStreamStream(self):
stream_length = 17
divisor_offset = 7
dividend_offset = 17
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
response_iterator = face_stub.inline_stream_in_stream_out(
DIV_MANY,
(math_pb2.DivArgs(
divisor=divisor_offset + index,
dividend=dividend_offset + index)
for index in range(stream_length)),
_TIMEOUT)
for index, response in enumerate(response_iterator):
self.assertEqual(
(dividend_offset + index) / (divisor_offset + index),
response.quotient)
self.assertEqual(
(dividend_offset + index) % (divisor_offset + index),
response.remainder)
self.assertEqual(stream_length, index + 1)
class DynamicInlineStubTest(unittest.TestCase):
def testUnaryUnary(self):
divisor = 59
dividend = 973
expected_quotient = dividend / divisor
expected_remainder = dividend % divisor
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_CARDINALITIES, pipe)
service.start()
with dynamic_stub:
response = dynamic_stub.Div(
math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
self.assertEqual(expected_quotient, response.quotient)
self.assertEqual(expected_remainder, response.remainder)
service.stop()
def testUnaryStream(self):
stream_length = 43
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_CARDINALITIES, pipe)
with service, dynamic_stub:
response_iterator = dynamic_stub.Fib(
math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
numbers = tuple(response.num for response in response_iterator)
for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
self.assertEqual(early + middle, later)
self.assertEqual(stream_length, len(numbers))
def testStreamUnary(self):
stream_length = 127
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_CARDINALITIES, pipe)
with service, dynamic_stub:
response_future = dynamic_stub.Sum.future(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
(stream_length * (stream_length - 1)) / 2,
response_future.result().num)
def testStreamStream(self):
stream_length = 179
divisor_offset = 71
dividend_offset = 1763
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_CARDINALITIES, pipe)
with service, dynamic_stub:
response_iterator = dynamic_stub.DivMany(
(math_pb2.DivArgs(
divisor=divisor_offset + index,
dividend=dividend_offset + index)
for index in range(stream_length)),
_TIMEOUT)
for index, response in enumerate(response_iterator):
self.assertEqual(
(dividend_offset + index) / (divisor_offset + index),
response.quotient)
self.assertEqual(
(dividend_offset + index) % (divisor_offset + index),
response.remainder)
self.assertEqual(stream_length, index + 1)
if __name__ == '__main__':
unittest.main()

@ -1,58 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in
# the face layer. The two should be squashed together as soon as manageable.
"""Interfaces for assembling RPC Framework values."""
import abc
from grpc.framework.foundation import activated
class Server(activated.Activated):
"""The server interface.
Aside from being able to be activated and deactivated, objects of this type
are able to report the port on which they are servicing RPCs.
"""
__metaclass__ = abc.ABCMeta
# TODO(issue 726): This is an abstraction violation; not every Server is
# necessarily serving over a network at all.
@abc.abstractmethod
def port(self):
"""Identifies the port on which this Server is servicing RPCs.
This method may only be called while the server is active.
Returns:
The number of the port on which this Server is servicing RPCs.
"""
raise NotImplementedError()

@ -213,14 +213,14 @@ class _DynamicStub(interfaces.DynamicStub):
self._pool = pool
def __getattr__(self, attr):
cardinality = self._cardinalities.get(attr)
if cardinality is cardinality.Cardinality.UNARY_UNARY:
method_cardinality = self._cardinalities.get(attr)
if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
return _UnaryUnaryMultiCallable(self._front, attr)
elif cardinality is cardinality.Cardinality.UNARY_STREAM:
elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
return _UnaryStreamMultiCallable(self._front, attr)
elif cardinality is cardinality.Cardinality.STREAM_UNARY:
elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
return _StreamUnaryMultiCallable(self._front, attr, self._pool)
elif cardinality is cardinality.Cardinality.STREAM_STREAM:
elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
return _StreamStreamMultiCallable(self._front, attr, self._pool)
else:
raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
@ -315,4 +315,4 @@ def dynamic_stub(cardinalities, front, pool, prefix):
An interfaces.DynamicStub that performs RPCs via the given
base_interfaces.Front.
"""
return _DynamicStub(cardinalities, front, pool, prefix)
return _DynamicStub(cardinalities, front, pool)

@ -64,7 +64,6 @@ _PACKAGES = (
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
'grpc.framework.assembly',
'grpc.framework.base',
'grpc.framework.base.packets',
'grpc.framework.common',

@ -26,9 +26,6 @@
{
"module": "grpc.early_adopter.implementations_test"
},
{
"module": "grpc.framework.assembly.implementations_test"
},
{
"module": "grpc.framework.base.packets.implementations_test"
},

Loading…
Cancel
Save