Add the _framework.face package.

pull/193/head
Nathaniel Manista 10 years ago
parent 21f29c6bf7
commit 7a3c38bed9
  1. 0
      src/python/_framework/face/__init__.py
  2. 310
      src/python/_framework/face/_calls.py
  3. 194
      src/python/_framework/face/_control.py
  4. 189
      src/python/_framework/face/_service.py
  5. 81
      src/python/_framework/face/_test_case.py
  6. 46
      src/python/_framework/face/blocking_invocation_inline_service_test.py
  7. 118
      src/python/_framework/face/demonstration.py
  8. 46
      src/python/_framework/face/event_invocation_synchronous_event_service_test.py
  9. 77
      src/python/_framework/face/exceptions.py
  10. 46
      src/python/_framework/face/future_invocation_asynchronous_event_service_test.py
  11. 246
      src/python/_framework/face/implementations.py
  12. 545
      src/python/_framework/face/interfaces.py
  13. 0
      src/python/_framework/face/testing/__init__.py
  14. 102
      src/python/_framework/face/testing/base_util.py
  15. 223
      src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py
  16. 94
      src/python/_framework/face/testing/callback.py
  17. 87
      src/python/_framework/face/testing/control.py
  18. 123
      src/python/_framework/face/testing/coverage.py
  19. 446
      src/python/_framework/face/testing/digest.py
  20. 367
      src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
  21. 377
      src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
  22. 117
      src/python/_framework/face/testing/interfaces.py
  23. 70
      src/python/_framework/face/testing/serial.py
  24. 337
      src/python/_framework/face/testing/service.py
  25. 374
      src/python/_framework/face/testing/stock_service.py
  26. 111
      src/python/_framework/face/testing/test_case.py
  27. 0
      src/python/_junkdrawer/__init__.py
  28. 152
      src/python/_junkdrawer/stock_pb2.py
  29. 5
      tools/run_tests/run_python.sh

@ -0,0 +1,310 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utility functions for invoking RPCs."""
import threading
from _framework.base import interfaces as base_interfaces
from _framework.base import util as base_util
from _framework.face import _control
from _framework.face import interfaces
from _framework.foundation import callable_util
from _framework.foundation import future
_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
class _RendezvousServicedIngestor(base_interfaces.ServicedIngestor):
def __init__(self, rendezvous):
self._rendezvous = rendezvous
def consumer(self, operation_context):
return self._rendezvous
class _EventServicedIngestor(base_interfaces.ServicedIngestor):
def __init__(self, result_consumer, abortion_callback):
self._result_consumer = result_consumer
self._abortion_callback = abortion_callback
def consumer(self, operation_context):
operation_context.add_termination_callback(
_control.as_operation_termination_callback(self._abortion_callback))
return self._result_consumer
def _rendezvous_subscription(rendezvous):
return base_util.full_serviced_subscription(
_RendezvousServicedIngestor(rendezvous))
def _unary_event_subscription(completion_callback, abortion_callback):
return base_util.full_serviced_subscription(
_EventServicedIngestor(
_control.UnaryConsumer(completion_callback), abortion_callback))
def _stream_event_subscription(result_consumer, abortion_callback):
return base_util.full_serviced_subscription(
_EventServicedIngestor(result_consumer, abortion_callback))
class _OperationCancellableIterator(interfaces.CancellableIterator):
"""An interfaces.CancellableIterator for response-streaming operations."""
def __init__(self, rendezvous, operation):
self._rendezvous = rendezvous
self._operation = operation
def __iter__(self):
return self
def next(self):
return next(self._rendezvous)
def cancel(self):
self._operation.cancel()
self._rendezvous.set_outcome(base_interfaces.CANCELLED)
class _OperationFuture(future.Future):
"""A future.Future interface to an operation."""
def __init__(self, rendezvous, operation):
self._condition = threading.Condition()
self._rendezvous = rendezvous
self._operation = operation
self._outcome = None
self._callbacks = []
def cancel(self):
"""See future.Future.cancel for specification."""
with self._condition:
if self._outcome is None:
self._operation.cancel()
self._outcome = future.aborted()
self._condition.notify_all()
return False
def cancelled(self):
"""See future.Future.cancelled for specification."""
return False
def done(self):
"""See future.Future.done for specification."""
with self._condition:
return (self._outcome is not None and
self._outcome.category is not future.ABORTED)
def outcome(self):
"""See future.Future.outcome for specification."""
with self._condition:
while self._outcome is None:
self._condition.wait()
return self._outcome
def add_done_callback(self, callback):
"""See future.Future.add_done_callback for specification."""
with self._condition:
if self._callbacks is not None:
self._callbacks.add(callback)
return
outcome = self._outcome
callable_util.call_logging_exceptions(
callback, _DONE_CALLBACK_LOG_MESSAGE, outcome)
def on_operation_termination(self, operation_outcome):
"""Indicates to this object that the operation has terminated.
Args:
operation_outcome: One of base_interfaces.COMPLETED,
base_interfaces.CANCELLED, base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE
indicating the categorical outcome of the operation.
"""
with self._condition:
if (self._outcome is None and
operation_outcome != base_interfaces.COMPLETED):
self._outcome = future.raised(
_control.abortion_outcome_to_exception(operation_outcome))
self._condition.notify_all()
outcome = self._outcome
rendezvous = self._rendezvous
callbacks = list(self._callbacks)
self._callbacks = None
if outcome is None:
try:
return_value = next(rendezvous)
except Exception as e: # pylint: disable=broad-except
outcome = future.raised(e)
else:
outcome = future.returned(return_value)
with self._condition:
if self._outcome is None:
self._outcome = outcome
self._condition.notify_all()
else:
outcome = self._outcome
for callback in callbacks:
callable_util.call_logging_exceptions(
callback, _DONE_CALLBACK_LOG_MESSAGE, outcome)
class _Call(interfaces.Call):
def __init__(self, operation):
self._operation = operation
self.context = _control.RpcContext(operation.context)
def cancel(self):
self._operation.cancel()
def blocking_value_in_value_out(front, name, payload, timeout, trace_id):
"""Services in a blocking fashion a value-in value-out servicer method."""
rendezvous = _control.Rendezvous()
subscription = _rendezvous_subscription(rendezvous)
operation = front.operate(
name, payload, True, timeout, subscription, trace_id)
operation.context.add_termination_callback(rendezvous.set_outcome)
return next(rendezvous)
def future_value_in_value_out(front, name, payload, timeout, trace_id):
"""Services a value-in value-out servicer method by returning a Future."""
rendezvous = _control.Rendezvous()
subscription = _rendezvous_subscription(rendezvous)
operation = front.operate(
name, payload, True, timeout, subscription, trace_id)
operation.context.add_termination_callback(rendezvous.set_outcome)
operation_future = _OperationFuture(rendezvous, operation)
operation.context.add_termination_callback(
operation_future.on_operation_termination)
return operation_future
def inline_value_in_stream_out(front, name, payload, timeout, trace_id):
"""Services a value-in stream-out servicer method."""
rendezvous = _control.Rendezvous()
subscription = _rendezvous_subscription(rendezvous)
operation = front.operate(
name, payload, True, timeout, subscription, trace_id)
operation.context.add_termination_callback(rendezvous.set_outcome)
return _OperationCancellableIterator(rendezvous, operation)
def blocking_stream_in_value_out(
front, name, payload_iterator, timeout, trace_id):
"""Services in a blocking fashion a stream-in value-out servicer method."""
rendezvous = _control.Rendezvous()
subscription = _rendezvous_subscription(rendezvous)
operation = front.operate(name, None, False, timeout, subscription, trace_id)
operation.context.add_termination_callback(rendezvous.set_outcome)
for payload in payload_iterator:
operation.consumer.consume(payload)
operation.consumer.terminate()
return next(rendezvous)
def future_stream_in_value_out(
front, name, payload_iterator, timeout, trace_id, pool):
"""Services a stream-in value-out servicer method by returning a Future."""
rendezvous = _control.Rendezvous()
subscription = _rendezvous_subscription(rendezvous)
operation = front.operate(name, None, False, timeout, subscription, trace_id)
operation.context.add_termination_callback(rendezvous.set_outcome)
pool.submit(
callable_util.with_exceptions_logged(
_control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE),
payload_iterator, operation.consumer, lambda: True, True)
operation_future = _OperationFuture(rendezvous, operation)
operation.context.add_termination_callback(
operation_future.on_operation_termination)
return operation_future
def inline_stream_in_stream_out(
front, name, payload_iterator, timeout, trace_id, pool):
"""Services a stream-in stream-out servicer method."""
rendezvous = _control.Rendezvous()
subscription = _rendezvous_subscription(rendezvous)
operation = front.operate(name, None, False, timeout, subscription, trace_id)
operation.context.add_termination_callback(rendezvous.set_outcome)
pool.submit(
callable_util.with_exceptions_logged(
_control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE),
payload_iterator, operation.consumer, lambda: True, True)
return _OperationCancellableIterator(rendezvous, operation)
def event_value_in_value_out(
front, name, payload, completion_callback, abortion_callback, timeout,
trace_id):
subscription = _unary_event_subscription(
completion_callback, abortion_callback)
operation = front.operate(
name, payload, True, timeout, subscription, trace_id)
return _Call(operation)
def event_value_in_stream_out(
front, name, payload, result_payload_consumer, abortion_callback, timeout,
trace_id):
subscription = _stream_event_subscription(
result_payload_consumer, abortion_callback)
operation = front.operate(
name, payload, True, timeout, subscription, trace_id)
return _Call(operation)
def event_stream_in_value_out(
front, name, completion_callback, abortion_callback, timeout, trace_id):
subscription = _unary_event_subscription(
completion_callback, abortion_callback)
operation = front.operate(name, None, False, timeout, subscription, trace_id)
return _Call(operation), operation.consumer
def event_stream_in_stream_out(
front, name, result_payload_consumer, abortion_callback, timeout, trace_id):
subscription = _stream_event_subscription(
result_payload_consumer, abortion_callback)
operation = front.operate(name, None, False, timeout, subscription, trace_id)
return _Call(operation), operation.consumer

@ -0,0 +1,194 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for translating between sync and async control flow."""
import threading
from _framework.base import interfaces as base_interfaces
from _framework.face import exceptions
from _framework.face import interfaces
from _framework.foundation import abandonment
from _framework.foundation import stream
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-('
_OPERATION_OUTCOME_TO_RPC_ABORTION = {
base_interfaces.CANCELLED: interfaces.CANCELLED,
base_interfaces.EXPIRED: interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE,
base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE,
base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
}
def _as_operation_termination_callback(rpc_abortion_callback):
def operation_termination_callback(operation_outcome):
rpc_abortion = _OPERATION_OUTCOME_TO_RPC_ABORTION.get(
operation_outcome, None)
if rpc_abortion is not None:
rpc_abortion_callback(rpc_abortion)
return operation_termination_callback
def _abortion_outcome_to_exception(abortion_outcome):
if abortion_outcome == base_interfaces.CANCELLED:
return exceptions.CancellationError()
elif abortion_outcome == base_interfaces.EXPIRED:
return exceptions.ExpirationError()
elif abortion_outcome == base_interfaces.SERVICER_FAILURE:
return exceptions.ServicerError()
elif abortion_outcome == base_interfaces.SERVICED_FAILURE:
return exceptions.ServicedError()
else:
return exceptions.NetworkError()
class UnaryConsumer(stream.Consumer):
"""A stream.Consumer that should only ever be passed one value."""
def __init__(self, on_termination):
self._on_termination = on_termination
self._value = None
def consume(self, value):
self._value = value
def terminate(self):
self._on_termination(self._value)
def consume_and_terminate(self, value):
self._on_termination(value)
class Rendezvous(stream.Consumer):
"""A rendez-vous with stream.Consumer and iterator interfaces."""
def __init__(self):
self._condition = threading.Condition()
self._values = []
self._values_completed = False
self._abortion = None
def consume(self, value):
with self._condition:
self._values.append(value)
self._condition.notify()
def terminate(self):
with self._condition:
self._values_completed = True
self._condition.notify()
def consume_and_terminate(self, value):
with self._condition:
self._values.append(value)
self._values_completed = True
self._condition.notify()
def __iter__(self):
return self
def next(self):
with self._condition:
while ((self._abortion is None) and
(not self._values) and
(not self._values_completed)):
self._condition.wait()
if self._abortion is not None:
raise _abortion_outcome_to_exception(self._abortion)
elif self._values:
return self._values.pop(0)
elif self._values_completed:
raise StopIteration()
else:
raise AssertionError('Unreachable code reached!')
def set_outcome(self, outcome):
with self._condition:
if outcome != base_interfaces.COMPLETED:
self._abortion = outcome
self._condition.notify()
class RpcContext(interfaces.RpcContext):
"""A wrapped base_interfaces.OperationContext."""
def __init__(self, operation_context):
self._operation_context = operation_context
def is_active(self):
return self._operation_context.is_active()
def time_remaining(self):
return self._operation_context.time_remaining()
def add_abortion_callback(self, abortion_callback):
self._operation_context.add_termination_callback(
_as_operation_termination_callback(abortion_callback))
def pipe_iterator_to_consumer(iterator, consumer, active, terminate):
"""Pipes values emitted from an iterator to a stream.Consumer.
Args:
iterator: An iterator from which values will be emitted.
consumer: A stream.Consumer to which values will be passed.
active: A no-argument callable that returns True if the work being done by
this function is still valid and should not be abandoned and False if the
work being done by this function should be abandoned.
terminate: A boolean indicating whether or not this function should
terminate the given consumer after passing to it all values emitted by the
given iterator.
Raises:
abandonment.Abandoned: If this function quits early after seeing False
returned by the active function passed to it.
Exception: This function raises whatever exceptions are raised by iterating
over the given iterator.
"""
for element in iterator:
if not active():
raise abandonment.Abandoned()
consumer.consume(element)
if not active():
raise abandonment.Abandoned()
if terminate:
consumer.terminate()
def abortion_outcome_to_exception(abortion_outcome):
return _abortion_outcome_to_exception(abortion_outcome)
def as_operation_termination_callback(rpc_abortion_callback):
return _as_operation_termination_callback(rpc_abortion_callback)

@ -0,0 +1,189 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Behaviors for servicing RPCs."""
# base_interfaces and interfaces are referenced from specification in this
# module.
from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import
from _framework.face import _control
from _framework.face import exceptions
from _framework.face import interfaces # pylint: disable=unused-import
from _framework.foundation import abandonment
from _framework.foundation import callable_util
from _framework.foundation import stream
from _framework.foundation import stream_util
class _ValueInStreamOutConsumer(stream.Consumer):
"""A stream.Consumer that maps inputs one-to-many onto outputs."""
def __init__(self, behavior, context, downstream):
"""Constructor.
Args:
behavior: A callable that takes a single value and an
interfaces.RpcContext and returns a generator of arbitrarily many
values.
context: An interfaces.RpcContext.
downstream: A stream.Consumer to which to pass the values generated by the
given behavior.
"""
self._behavior = behavior
self._context = context
self._downstream = downstream
def consume(self, value):
_control.pipe_iterator_to_consumer(
self._behavior(value, self._context), self._downstream,
self._context.is_active, False)
def terminate(self):
self._downstream.terminate()
def consume_and_terminate(self, value):
_control.pipe_iterator_to_consumer(
self._behavior(value, self._context), self._downstream,
self._context.is_active, True)
def _pool_wrap(behavior, operation_context):
"""Wraps an operation-related behavior so that it may be called in a pool.
Args:
behavior: A callable related to carrying out an operation.
operation_context: A base_interfaces.OperationContext for the operation.
Returns:
A callable that when called carries out the behavior of the given callable
and handles whatever exceptions it raises appropriately.
"""
def translation(*args):
try:
behavior(*args)
except (
abandonment.Abandoned,
exceptions.ExpirationError,
exceptions.CancellationError,
exceptions.ServicedError,
exceptions.NetworkError) as e:
if operation_context.is_active():
operation_context.fail(e)
except Exception as e:
operation_context.fail(e)
return callable_util.with_exceptions_logged(
translation, _control.INTERNAL_ERROR_LOG_MESSAGE)
def adapt_inline_value_in_value_out(method):
def adaptation(response_consumer, operation_context):
rpc_context = _control.RpcContext(operation_context)
return stream_util.TransformingConsumer(
lambda request: method.service(request, rpc_context), response_consumer)
return adaptation
def adapt_inline_value_in_stream_out(method):
def adaptation(response_consumer, operation_context):
rpc_context = _control.RpcContext(operation_context)
return _ValueInStreamOutConsumer(
method.service, rpc_context, response_consumer)
return adaptation
def adapt_inline_stream_in_value_out(method, pool):
def adaptation(response_consumer, operation_context):
rendezvous = _control.Rendezvous()
operation_context.add_termination_callback(rendezvous.set_outcome)
def in_pool_thread():
response_consumer.consume_and_terminate(
method.service(rendezvous, _control.RpcContext(operation_context)))
pool.submit(_pool_wrap(in_pool_thread, operation_context))
return rendezvous
return adaptation
def adapt_inline_stream_in_stream_out(method, pool):
"""Adapts an interfaces.InlineStreamInStreamOutMethod for use with Consumers.
RPCs may be serviced by calling the return value of this function, passing
request values to the stream.Consumer returned from that call, and receiving
response values from the stream.Consumer passed to that call.
Args:
method: An interfaces.InlineStreamInStreamOutMethod.
pool: A thread pool.
Returns:
A callable that takes a stream.Consumer and a
base_interfaces.OperationContext and returns a stream.Consumer.
"""
def adaptation(response_consumer, operation_context):
rendezvous = _control.Rendezvous()
operation_context.add_termination_callback(rendezvous.set_outcome)
def in_pool_thread():
_control.pipe_iterator_to_consumer(
method.service(rendezvous, _control.RpcContext(operation_context)),
response_consumer, operation_context.is_active, True)
pool.submit(_pool_wrap(in_pool_thread, operation_context))
return rendezvous
return adaptation
def adapt_event_value_in_value_out(method):
def adaptation(response_consumer, operation_context):
def on_payload(payload):
method.service(
payload, response_consumer.consume_and_terminate,
_control.RpcContext(operation_context))
return _control.UnaryConsumer(on_payload)
return adaptation
def adapt_event_value_in_stream_out(method):
def adaptation(response_consumer, operation_context):
def on_payload(payload):
method.service(
payload, response_consumer, _control.RpcContext(operation_context))
return _control.UnaryConsumer(on_payload)
return adaptation
def adapt_event_stream_in_value_out(method):
def adaptation(response_consumer, operation_context):
rpc_context = _control.RpcContext(operation_context)
return method.service(response_consumer.consume_and_terminate, rpc_context)
return adaptation
def adapt_event_stream_in_stream_out(method):
def adaptation(response_consumer, operation_context):
return method.service(
response_consumer, _control.RpcContext(operation_context))
return adaptation

@ -0,0 +1,81 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Common lifecycle code for in-memory-ticket-exchange Face-layer tests."""
from _framework.face import implementations
from _framework.face.testing import base_util
from _framework.face.testing import test_case
from _framework.foundation import logging_pool
_TIMEOUT = 3
_MAXIMUM_POOL_SIZE = 100
class FaceTestCase(test_case.FaceTestCase):
"""Provides abstract Face-layer tests an in-memory implementation."""
def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
servicer = implementations.servicer(
servicer_pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
linked_pair = base_util.linked_pair(servicer, _TIMEOUT)
server = implementations.server()
stub = implementations.stub(linked_pair.front, stub_pool)
return server, stub, (servicer_pool, stub_pool, linked_pair)
def tear_down_implementation(self, memo):
servicer_pool, stub_pool, linked_pair = memo
linked_pair.shut_down()
stub_pool.shutdown(wait=True)
servicer_pool.shutdown(wait=True)

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from _framework.face import _test_case
from _framework.face.testing import blocking_invocation_inline_service_test_case as test_case
class BlockingInvocationInlineServiceTest(
_test_case.FaceTestCase,
test_case.BlockingInvocationInlineServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,118 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Demonstration-suitable implementation of the face layer of RPC Framework."""
from _framework.base import util as _base_util
from _framework.base.packets import implementations as _tickets_implementations
from _framework.face import implementations
from _framework.foundation import logging_pool
_POOL_SIZE_LIMIT = 20
_MAXIMUM_TIMEOUT = 90
class LinkedPair(object):
"""A Server and Stub that are linked to one another.
Attributes:
server: A Server.
stub: A Stub.
"""
def shut_down(self):
"""Shuts down this object and releases its resources."""
raise NotImplementedError()
class _LinkedPair(LinkedPair):
def __init__(self, server, stub, front, back, pools):
self.server = server
self.stub = stub
self._front = front
self._back = back
self._pools = pools
def shut_down(self):
_base_util.wait_for_idle(self._front)
_base_util.wait_for_idle(self._back)
for pool in self._pools:
pool.shutdown(wait=True)
def server_and_stub(
default_timeout,
inline_value_in_value_out_methods=None,
inline_value_in_stream_out_methods=None,
inline_stream_in_value_out_methods=None,
inline_stream_in_stream_out_methods=None,
event_value_in_value_out_methods=None,
event_value_in_stream_out_methods=None,
event_stream_in_value_out_methods=None,
event_stream_in_stream_out_methods=None,
multi_method=None):
"""Creates a Server and Stub linked together for use."""
front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
stub_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
pools = (
front_work_pool, front_transmission_pool, front_utility_pool,
back_work_pool, back_transmission_pool, back_utility_pool,
stub_pool)
servicer = implementations.servicer(
back_work_pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
front = _tickets_implementations.front(
front_work_pool, front_transmission_pool, front_utility_pool)
back = _tickets_implementations.back(
servicer, back_work_pool, back_transmission_pool, back_utility_pool,
default_timeout, _MAXIMUM_TIMEOUT)
front.join_rear_link(back)
back.join_fore_link(front)
stub = implementations.stub(front, stub_pool)
return _LinkedPair(implementations.server(), stub, front, back, pools)

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from _framework.face import _test_case
from _framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case
class EventInvocationSynchronousEventServiceTest(
_test_case.FaceTestCase,
test_case.EventInvocationSynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,77 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Exceptions used in the Face layer of RPC Framework."""
import abc
class NoSuchMethodError(Exception):
"""Raised by customer code to indicate an unrecognized RPC method name.
Attributes:
name: The unrecognized name.
"""
def __init__(self, name):
"""Constructor.
Args:
name: The unrecognized RPC method name.
"""
super(NoSuchMethodError, self).__init__()
self.name = name
class RpcError(Exception):
"""Common super type for all exceptions raised by the Face layer.
Only RPC Framework should instantiate and raise these exceptions.
"""
__metaclass__ = abc.ABCMeta
class CancellationError(RpcError):
"""Indicates that an RPC has been cancelled."""
class ExpirationError(RpcError):
"""Indicates that an RPC has expired ("timed out")."""
class NetworkError(RpcError):
"""Indicates that some error occurred on the network."""
class ServicedError(RpcError):
"""Indicates that the Serviced failed in the course of an RPC."""
class ServicerError(RpcError):
"""Indicates that the Servicer failed in the course of servicing an RPC."""

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from _framework.face import _test_case
from _framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case
class FutureInvocationAsynchronousEventServiceTest(
_test_case.FaceTestCase,
test_case.FutureInvocationAsynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,246 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the Face layer of RPC Framework."""
from _framework.base import exceptions as _base_exceptions
from _framework.base import interfaces as base_interfaces
from _framework.face import _calls
from _framework.face import _service
from _framework.face import exceptions
from _framework.face import interfaces
class _BaseServicer(base_interfaces.Servicer):
def __init__(self, methods, multi_method):
self._methods = methods
self._multi_method = multi_method
def service(self, name, context, output_consumer):
method = self._methods.get(name, None)
if method is not None:
return method(output_consumer, context)
elif self._multi_method is not None:
try:
return self._multi_method.service(name, output_consumer, context)
except exceptions.NoSuchMethodError:
raise _base_exceptions.NoSuchMethodError()
else:
raise _base_exceptions.NoSuchMethodError()
class _Server(interfaces.Server):
"""An interfaces.Server implementation."""
class _Stub(interfaces.Stub):
"""An interfaces.Stub implementation."""
def __init__(self, front, pool):
self._front = front
self._pool = pool
def blocking_value_in_value_out(self, name, request, timeout):
return _calls.blocking_value_in_value_out(
self._front, name, request, timeout, 'unused trace ID')
def future_value_in_value_out(self, name, request, timeout):
return _calls.future_value_in_value_out(
self._front, name, request, timeout, 'unused trace ID')
def inline_value_in_stream_out(self, name, request, timeout):
return _calls.inline_value_in_stream_out(
self._front, name, request, timeout, 'unused trace ID')
def blocking_stream_in_value_out(self, name, request_iterator, timeout):
return _calls.blocking_stream_in_value_out(
self._front, name, request_iterator, timeout, 'unused trace ID')
def future_stream_in_value_out(self, name, request_iterator, timeout):
return _calls.future_stream_in_value_out(
self._front, name, request_iterator, timeout, 'unused trace ID',
self._pool)
def inline_stream_in_stream_out(self, name, request_iterator, timeout):
return _calls.inline_stream_in_stream_out(
self._front, name, request_iterator, timeout, 'unused trace ID',
self._pool)
def event_value_in_value_out(
self, name, request, response_callback, abortion_callback, timeout):
return _calls.event_value_in_value_out(
self._front, name, request, response_callback, abortion_callback,
timeout, 'unused trace ID')
def event_value_in_stream_out(
self, name, request, response_consumer, abortion_callback, timeout):
return _calls.event_value_in_stream_out(
self._front, name, request, response_consumer, abortion_callback,
timeout, 'unused trace ID')
def event_stream_in_value_out(
self, name, response_callback, abortion_callback, timeout):
return _calls.event_stream_in_value_out(
self._front, name, response_callback, abortion_callback, timeout,
'unused trace ID')
def event_stream_in_stream_out(
self, name, response_consumer, abortion_callback, timeout):
return _calls.event_stream_in_stream_out(
self._front, name, response_consumer, abortion_callback, timeout,
'unused trace ID')
def _aggregate_methods(
pool,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods):
"""Aggregates methods coded in according to different interfaces."""
methods = {}
def adapt_unpooled_methods(adapted_methods, unadapted_methods, adaptation):
if unadapted_methods is not None:
for name, unadapted_method in unadapted_methods.iteritems():
adapted_methods[name] = adaptation(unadapted_method)
def adapt_pooled_methods(adapted_methods, unadapted_methods, adaptation):
if unadapted_methods is not None:
for name, unadapted_method in unadapted_methods.iteritems():
adapted_methods[name] = adaptation(unadapted_method, pool)
adapt_unpooled_methods(
methods, inline_value_in_value_out_methods,
_service.adapt_inline_value_in_value_out)
adapt_unpooled_methods(
methods, inline_value_in_stream_out_methods,
_service.adapt_inline_value_in_stream_out)
adapt_pooled_methods(
methods, inline_stream_in_value_out_methods,
_service.adapt_inline_stream_in_value_out)
adapt_pooled_methods(
methods, inline_stream_in_stream_out_methods,
_service.adapt_inline_stream_in_stream_out)
adapt_unpooled_methods(
methods, event_value_in_value_out_methods,
_service.adapt_event_value_in_value_out)
adapt_unpooled_methods(
methods, event_value_in_stream_out_methods,
_service.adapt_event_value_in_stream_out)
adapt_unpooled_methods(
methods, event_stream_in_value_out_methods,
_service.adapt_event_stream_in_value_out)
adapt_unpooled_methods(
methods, event_stream_in_stream_out_methods,
_service.adapt_event_stream_in_stream_out)
return methods
def servicer(
pool,
inline_value_in_value_out_methods=None,
inline_value_in_stream_out_methods=None,
inline_stream_in_value_out_methods=None,
inline_stream_in_stream_out_methods=None,
event_value_in_value_out_methods=None,
event_value_in_stream_out_methods=None,
event_stream_in_value_out_methods=None,
event_stream_in_stream_out_methods=None,
multi_method=None):
"""Creates a base_interfaces.Servicer.
The key sets of the passed dictionaries must be disjoint. It is guaranteed
that any passed MultiMethod implementation will only be called to service an
RPC if the RPC method name is not present in the key sets of the passed
dictionaries.
Args:
pool: A thread pool.
inline_value_in_value_out_methods: A dictionary mapping method names to
interfaces.InlineValueInValueOutMethod implementations.
inline_value_in_stream_out_methods: A dictionary mapping method names to
interfaces.InlineValueInStreamOutMethod implementations.
inline_stream_in_value_out_methods: A dictionary mapping method names to
interfaces.InlineStreamInValueOutMethod implementations.
inline_stream_in_stream_out_methods: A dictionary mapping method names to
interfaces.InlineStreamInStreamOutMethod implementations.
event_value_in_value_out_methods: A dictionary mapping method names to
interfaces.EventValueInValueOutMethod implementations.
event_value_in_stream_out_methods: A dictionary mapping method names to
interfaces.EventValueInStreamOutMethod implementations.
event_stream_in_value_out_methods: A dictionary mapping method names to
interfaces.EventStreamInValueOutMethod implementations.
event_stream_in_stream_out_methods: A dictionary mapping method names to
interfaces.EventStreamInStreamOutMethod implementations.
multi_method: An implementation of interfaces.MultiMethod.
Returns:
A base_interfaces.Servicer that services RPCs via the given implementations.
"""
methods = _aggregate_methods(
pool,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods)
return _BaseServicer(methods, multi_method)
def server():
"""Creates an interfaces.Server.
Returns:
An interfaces.Server.
"""
return _Server()
def stub(front, pool):
"""Creates an interfaces.Stub.
Args:
front: A base_interfaces.Front.
pool: A futures.ThreadPoolExecutor.
Returns:
An interfaces.Stub that performs RPCs via the given base_interfaces.Front.
"""
return _Stub(front, pool)

@ -0,0 +1,545 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces for the face layer of RPC Framework."""
import abc
# exceptions, abandonment, and future are referenced from specification in this
# module.
from _framework.face import exceptions # pylint: disable=unused-import
from _framework.foundation import abandonment # pylint: disable=unused-import
from _framework.foundation import future # pylint: disable=unused-import
class CancellableIterator(object):
"""Implements the Iterator protocol and affords a cancel method."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __iter__(self):
"""Returns the self object in accordance with the Iterator protocol."""
raise NotImplementedError()
@abc.abstractmethod
def next(self):
"""Returns a value or raises StopIteration per the Iterator protocol."""
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
"""Requests cancellation of whatever computation underlies this iterator."""
raise NotImplementedError()
# Constants that categorize RPC abortion.
# TODO(nathaniel): Learn and use Python's enum library for this de facto
# enumerated type
CANCELLED = 'abortion: cancelled'
EXPIRED = 'abortion: expired'
NETWORK_FAILURE = 'abortion: network failure'
SERVICED_FAILURE = 'abortion: serviced failure'
SERVICER_FAILURE = 'abortion: servicer failure'
class RpcContext(object):
"""Provides RPC-related information and action."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def is_active(self):
"""Describes whether the RPC is active or has terminated."""
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
Returns:
A nonnegative float indicating the length of allowed time in seconds
remaining for the RPC to complete before it is considered to have timed
out.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_abortion_callback(self, abortion_callback):
"""Registers a callback to be called if the RPC is aborted.
Args:
abortion_callback: A callable to be called and passed one of CANCELLED,
EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the
event of RPC abortion.
"""
raise NotImplementedError()
class InlineValueInValueOutMethod(object):
"""A type for inline unary-request-unary-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, context):
"""Services an RPC that accepts one value and produces one value.
Args:
request: The single request value for the RPC.
context: An RpcContext object.
Returns:
The single response value for the RPC.
Raises:
abandonment.Abandoned: If no response is necessary because the RPC has
been aborted.
"""
raise NotImplementedError()
class InlineValueInStreamOutMethod(object):
"""A type for inline unary-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, context):
"""Services an RPC that accepts one value and produces a stream of values.
Args:
request: The single request value for the RPC.
context: An RpcContext object.
Yields:
The values that comprise the response stream of the RPC.
Raises:
abandonment.Abandoned: If completing the response stream is not necessary
because the RPC has been aborted.
"""
raise NotImplementedError()
class InlineStreamInValueOutMethod(object):
"""A type for inline stream-request-unary-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request_iterator, context):
"""Services an RPC that accepts a stream of values and produces one value.
Args:
request_iterator: An iterator that yields the request values of the RPC.
Drawing values from this iterator may also raise exceptions.RpcError to
indicate abortion of the RPC.
context: An RpcContext object.
Yields:
The values that comprise the response stream of the RPC.
Raises:
abandonment.Abandoned: If no response is necessary because the RPC has
been aborted.
exceptions.RpcError: Implementations of this method must not deliberately
raise exceptions.RpcError but may allow such errors raised by the
request_iterator passed to them to propagate through their bodies
uncaught.
"""
raise NotImplementedError()
class InlineStreamInStreamOutMethod(object):
"""A type for inline stream-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request_iterator, context):
"""Services an RPC that accepts and produces streams of values.
Args:
request_iterator: An iterator that yields the request values of the RPC.
Drawing values from this iterator may also raise exceptions.RpcError to
indicate abortion of the RPC.
context: An RpcContext object.
Yields:
The values that comprise the response stream of the RPC.
Raises:
abandonment.Abandoned: If completing the response stream is not necessary
because the RPC has been aborted.
exceptions.RpcError: Implementations of this method must not deliberately
raise exceptions.RpcError but may allow such errors raised by the
request_iterator passed to them to propagate through their bodies
uncaught.
"""
raise NotImplementedError()
class EventValueInValueOutMethod(object):
"""A type for event-driven unary-request-unary-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, response_callback, context):
"""Services an RPC that accepts one value and produces one value.
Args:
request: The single request value for the RPC.
response_callback: A callback to be called to accept the response value of
the RPC.
context: An RpcContext object.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class EventValueInStreamOutMethod(object):
"""A type for event-driven unary-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, response_consumer, context):
"""Services an RPC that accepts one value and produces a stream of values.
Args:
request: The single request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
context: An RpcContext object.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class EventStreamInValueOutMethod(object):
"""A type for event-driven stream-request-unary-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, response_callback, context):
"""Services an RPC that accepts a stream of values and produces one value.
Args:
response_callback: A callback to be called to accept the response value of
the RPC.
context: An RpcContext object.
Returns:
A stream.Consumer with which to accept the request values of the RPC. The
consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing values to this object. Implementations must not assume that this
object will be called to completion of the request stream or even called
at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class EventStreamInStreamOutMethod(object):
"""A type for event-driven stream-request-stream-response RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, response_consumer, context):
"""Services an RPC that accepts and produces streams of values.
Args:
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
context: An RpcContext object.
Returns:
A stream.Consumer with which to accept the request values of the RPC. The
consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing values to this object. Implementations must not assume that this
object will be called to completion of the request stream or even called
at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class MultiMethod(object):
"""A general type able to service many RPC methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, name, response_consumer, context):
"""Services an RPC.
Args:
name: The RPC method name.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
context: An RpcContext object.
Returns:
A stream.Consumer with which to accept the request values of the RPC. The
consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing values to this object. Implementations must not assume that this
object will be called to completion of the request stream or even called
at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
exceptions.NoSuchMethodError: If this MultiMethod does not recognize the
given RPC method name and is not able to service the RPC.
"""
raise NotImplementedError()
class Server(object):
"""Specification of a running server that services RPCs."""
__metaclass__ = abc.ABCMeta
class Call(object):
"""Invocation-side representation of an RPC.
Attributes:
context: An RpcContext affording information about the RPC.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cancel(self):
"""Requests cancellation of the RPC."""
raise NotImplementedError()
class Stub(object):
"""Affords RPC methods to callers."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def blocking_value_in_value_out(self, name, request, timeout):
"""Invokes a unary-request-unary-response RPC method.
This method blocks until either returning the response value of the RPC
(in the event of RPC completion) or raising an exception (in the event of
RPC abortion).
Args:
name: The RPC method name.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def future_value_in_value_out(self, name, request, timeout):
"""Invokes a unary-request-unary-response RPC method.
Args:
name: The RPC method name.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future will return an outcome indicating that the RPC returned
the response value of the RPC. In the event of RPC abortion, the
returned Future will return an outcome indicating that the RPC raised
an exceptions.RpcError.
"""
raise NotImplementedError()
@abc.abstractmethod
def inline_value_in_stream_out(self, name, request, timeout):
"""Invokes a unary-request-stream-response RPC method.
Args:
name: The RPC method name.
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A CancellableIterator that yields the response values of the RPC and
affords RPC cancellation. Drawing response values from the returned
CancellableIterator may raise exceptions.RpcError indicating abortion of
the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def blocking_stream_in_value_out(self, name, request_iterator, timeout):
"""Invokes a stream-request-unary-response RPC method.
This method blocks until either returning the response value of the RPC
(in the event of RPC completion) or raising an exception (in the event of
RPC abortion).
Args:
name: The RPC method name.
request_iterator: An iterator that yields the request values of the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
The response value for the RPC.
Raises:
exceptions.RpcError: Indicating that the RPC was aborted.
"""
raise NotImplementedError()
@abc.abstractmethod
def future_stream_in_value_out(self, name, request_iterator, timeout):
"""Invokes a stream-request-unary-response RPC method.
Args:
name: The RPC method name.
request_iterator: An iterator that yields the request values of the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A future.Future representing the RPC. In the event of RPC completion, the
returned Future will return an outcome indicating that the RPC returned
the response value of the RPC. In the event of RPC abortion, the
returned Future will return an outcome indicating that the RPC raised
an exceptions.RpcError.
"""
raise NotImplementedError()
@abc.abstractmethod
def inline_stream_in_stream_out(self, name, request_iterator, timeout):
"""Invokes a stream-request-stream-response RPC method.
Args:
name: The RPC method name.
request_iterator: An iterator that yields the request values of the RPC.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A CancellableIterator that yields the response values of the RPC and
affords RPC cancellation. Drawing response values from the returned
CancellableIterator may raise exceptions.RpcError indicating abortion of
the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_value_in_value_out(
self, name, request, response_callback, abortion_callback, timeout):
"""Event-driven invocation of a unary-request-unary-response RPC method.
Args:
name: The RPC method name.
request: The request value for the RPC.
response_callback: A callback to be called to accept the response value
of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A Call object for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_value_in_stream_out(
self, name, request, response_consumer, abortion_callback, timeout):
"""Event-driven invocation of a unary-request-stream-response RPC method.
Args:
name: The RPC method name.
request: The request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A Call object for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_stream_in_value_out(
self, name, response_callback, abortion_callback, timeout):
"""Event-driven invocation of a unary-request-unary-response RPC method.
Args:
name: The RPC method name.
response_callback: A callback to be called to accept the response value
of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
request values of the RPC should be passed.
"""
raise NotImplementedError()
@abc.abstractmethod
def event_stream_in_stream_out(
self, name, response_consumer, abortion_callback, timeout):
"""Event-driven invocation of a unary-request-stream-response RPC method.
Args:
name: The RPC method name.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
request values of the RPC should be passed.
"""
raise NotImplementedError()

@ -0,0 +1,102 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for creating Base-layer objects for use in Face-layer tests."""
import abc
# interfaces is referenced from specification in this module.
from _framework.base import util as _base_util
from _framework.base.packets import implementations
from _framework.base.packets import in_memory
from _framework.base.packets import interfaces # pylint: disable=unused-import
from _framework.foundation import logging_pool
_POOL_SIZE_LIMIT = 20
_MAXIMUM_TIMEOUT = 90
class LinkedPair(object):
"""A Front and Back that are linked to one another.
Attributes:
front: An interfaces.Front.
back: An interfaces.Back.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def shut_down(self):
"""Shuts down this object and releases its resources."""
raise NotImplementedError()
class _LinkedPair(LinkedPair):
def __init__(self, front, back, pools):
self.front = front
self.back = back
self._pools = pools
def shut_down(self):
_base_util.wait_for_idle(self.front)
_base_util.wait_for_idle(self.back)
for pool in self._pools:
pool.shutdown(wait=True)
def linked_pair(servicer, default_timeout):
"""Creates a Server and Stub linked together for use."""
link_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
pools = (
link_pool,
front_work_pool, front_transmission_pool, front_utility_pool,
back_work_pool, back_transmission_pool, back_utility_pool)
link = in_memory.Link(link_pool)
front = implementations.front(
front_work_pool, front_transmission_pool, front_utility_pool)
back = implementations.back(
servicer, back_work_pool, back_transmission_pool, back_utility_pool,
default_timeout, _MAXIMUM_TIMEOUT)
front.join_rear_link(link)
link.join_fore_link(front)
back.join_fore_link(link)
link.join_rear_link(back)
return _LinkedPair(front, back, pools)

@ -0,0 +1,223 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test to verify an implementation of the Face layer of RPC Framework."""
# unittest is referenced from specification in this module.
import abc
import unittest # pylint: disable=unused-import
from _framework.face import exceptions
from _framework.face.testing import control
from _framework.face.testing import coverage
from _framework.face.testing import digest
from _framework.face.testing import stock_service
from _framework.face.testing import test_case
_TIMEOUT = 3
class BlockingInvocationInlineServiceTestCase(
test_case.FaceTestCase, coverage.BlockingCoverage):
"""A test of the Face layer of RPC Framework.
Concrete subclasses must also extend unittest.TestCase.
"""
__metaclass__ = abc.ABCMeta
def setUp(self):
"""See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
self.control = control.PauseFailControl()
self.digest = digest.digest(
stock_service.STOCK_TEST_SERVICE, self.control, None)
self.server, self.stub, self.memo = self.set_up_implementation(
self.digest.name, self.digest.methods,
self.digest.inline_unary_unary_methods,
self.digest.inline_unary_stream_methods,
self.digest.inline_stream_unary_methods,
self.digest.inline_stream_stream_methods,
{}, {}, {}, {}, None)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
self.tear_down_implementation(self.memo)
def testSuccessfulUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
name, request, _TIMEOUT)
test_messages.verify(request, response, self)
def testSuccessfulUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
def testSuccessfulStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
name, iter(requests), _TIMEOUT)
test_messages.verify(requests, response, self)
def testSuccessfulStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
def testSequentialInvocations(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_response = self.stub.blocking_value_in_value_out(
name, first_request, _TIMEOUT)
test_messages.verify(first_request, first_response, self)
second_response = self.stub.blocking_value_in_value_out(
name, second_request, _TIMEOUT)
test_messages.verify(second_request, second_response, self)
def testExpiredUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
list(response_iterator)

@ -0,0 +1,94 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A utility useful in tests of asynchronous, event-driven interfaces."""
import threading
from _framework.foundation import stream
class Callback(stream.Consumer):
"""A utility object useful in tests of asynchronous code."""
def __init__(self):
self._condition = threading.Condition()
self._unary_response = None
self._streamed_responses = []
self._completed = False
self._abortion = None
def abort(self, abortion):
with self._condition:
self._abortion = abortion
self._condition.notify_all()
def complete(self, unary_response):
with self._condition:
self._unary_response = unary_response
self._completed = True
self._condition.notify_all()
def consume(self, streamed_response):
with self._condition:
self._streamed_responses.append(streamed_response)
def terminate(self):
with self._condition:
self._completed = True
self._condition.notify_all()
def consume_and_terminate(self, streamed_response):
with self._condition:
self._streamed_responses.append(streamed_response)
self._completed = True
self._condition.notify_all()
def block_until_terminated(self):
with self._condition:
while self._abortion is None and not self._completed:
self._condition.wait()
def response(self):
with self._condition:
if self._abortion is None:
return self._unary_response
else:
raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
def responses(self):
with self._condition:
if self._abortion is None:
return list(self._streamed_responses)
else:
raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
def abortion(self):
with self._condition:
return self._abortion

@ -0,0 +1,87 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Code for instructing systems under test to block or fail."""
import abc
import contextlib
import threading
class Control(object):
"""An object that accepts program control from a system under test.
Systems under test passed a Control should call its control() method
frequently during execution. The control() method may block, raise an
exception, or do nothing, all according to the enclosing test's desire for
the system under test to simulate hanging, failing, or functioning.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def control(self):
"""Potentially does anything."""
raise NotImplementedError()
class PauseFailControl(Control):
"""A Control that can be used to pause or fail code under control."""
def __init__(self):
self._condition = threading.Condition()
self._paused = False
self._fail = False
def control(self):
with self._condition:
if self._fail:
raise ValueError()
while self._paused:
self._condition.wait()
@contextlib.contextmanager
def pause(self):
"""Pauses code under control while controlling code is in context."""
with self._condition:
self._paused = True
yield
with self._condition:
self._paused = False
self._condition.notify_all()
@contextlib.contextmanager
def fail(self):
"""Fails code under control while controlling code is in context."""
with self._condition:
self._fail = True
yield
with self._condition:
self._fail = False

@ -0,0 +1,123 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Governs coverage for the tests of the Face layer of RPC Framework."""
import abc
# These classes are only valid when inherited by unittest.TestCases.
# pylint: disable=invalid-name
class BlockingCoverage(object):
"""Specification of test coverage for blocking behaviors."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def testSuccessfulUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSuccessfulUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSuccessfulStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSuccessfulStreamRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSequentialInvocations(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredStreamRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedStreamRequestStreamResponse(self):
raise NotImplementedError()
class FullCoverage(BlockingCoverage):
"""Specification of test coverage for non-blocking behaviors."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def testParallelInvocations(self):
raise NotImplementedError()
@abc.abstractmethod
def testWaitingForSomeButNotAllParallelInvocations(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledStreamRequestStreamResponse(self):
raise NotImplementedError()

@ -0,0 +1,446 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Code for making a service.TestService more amenable to use in tests."""
import collections
import threading
# testing_control, interfaces, and testing_service are referenced from
# specification in this module.
from _framework.face import exceptions
from _framework.face import interfaces as face_interfaces
from _framework.face.testing import control as testing_control # pylint: disable=unused-import
from _framework.face.testing import interfaces # pylint: disable=unused-import
from _framework.face.testing import service as testing_service # pylint: disable=unused-import
from _framework.foundation import stream
from _framework.foundation import stream_util
_IDENTITY = lambda x: x
class TestServiceDigest(
collections.namedtuple(
'TestServiceDigest',
['name',
'methods',
'inline_unary_unary_methods',
'inline_unary_stream_methods',
'inline_stream_unary_methods',
'inline_stream_stream_methods',
'event_unary_unary_methods',
'event_unary_stream_methods',
'event_stream_unary_methods',
'event_stream_stream_methods',
'multi_method',
'unary_unary_messages_sequences',
'unary_stream_messages_sequences',
'stream_unary_messages_sequences',
'stream_stream_messages_sequences'])):
"""A transformation of a service.TestService.
Attributes:
name: The RPC service name to be used in the test.
methods: A sequence of interfaces.Method objects describing the RPC
methods that will be called during the test.
inline_unary_unary_methods: A dict from method name to
face_interfaces.InlineValueInValueOutMethod object to be used in tests of
in-line calls to behaviors under test.
inline_unary_stream_methods: A dict from method name to
face_interfaces.InlineValueInStreamOutMethod object to be used in tests of
in-line calls to behaviors under test.
inline_stream_unary_methods: A dict from method name to
face_interfaces.InlineStreamInValueOutMethod object to be used in tests of
in-line calls to behaviors under test.
inline_stream_stream_methods: A dict from method name to
face_interfaces.InlineStreamInStreamOutMethod object to be used in tests
of in-line calls to behaviors under test.
event_unary_unary_methods: A dict from method name to
face_interfaces.EventValueInValueOutMethod object to be used in tests of
event-driven calls to behaviors under test.
event_unary_stream_methods: A dict from method name to
face_interfaces.EventValueInStreamOutMethod object to be used in tests of
event-driven calls to behaviors under test.
event_stream_unary_methods: A dict from method name to
face_interfaces.EventStreamInValueOutMethod object to be used in tests of
event-driven calls to behaviors under test.
event_stream_stream_methods: A dict from method name to
face_interfaces.EventStreamInStreamOutMethod object to be used in tests of
event-driven calls to behaviors under test.
multi_method: A face_interfaces.MultiMethod to be used in tests of generic
calls to behaviors under test.
unary_unary_messages_sequences: A dict from method name to sequence of
service.UnaryUnaryTestMessages objects to be used to test the method
with the given name.
unary_stream_messages_sequences: A dict from method name to sequence of
service.UnaryStreamTestMessages objects to be used to test the method
with the given name.
stream_unary_messages_sequences: A dict from method name to sequence of
service.StreamUnaryTestMessages objects to be used to test the method
with the given name.
stream_stream_messages_sequences: A dict from method name to sequence of
service.StreamStreamTestMessages objects to be used to test the
method with the given name.
serialization: A serial.Serialization object describing serialization
behaviors for all the RPC methods.
"""
class _BufferingConsumer(stream.Consumer):
"""A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
def __init__(self):
self.consumed = []
self.terminated = False
def consume(self, value):
self.consumed.append(value)
def terminate(self):
self.terminated = True
def consume_and_terminate(self, value):
self.consumed.append(value)
self.terminated = True
class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
def __init__(self, unary_unary_test_method, control):
self._test_method = unary_unary_test_method
self._control = control
def service(self, request, context):
response_list = []
self._test_method.service(
request, response_list.append, context, self._control)
return response_list.pop(0)
class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod):
def __init__(self, unary_unary_test_method, control, pool):
self._test_method = unary_unary_test_method
self._control = control
self._pool = pool
def service(self, request, response_callback, context):
if self._pool is None:
self._test_method.service(
request, response_callback, context, self._control)
else:
self._pool.submit(
self._test_method.service, request, response_callback, context,
self._control)
class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
def __init__(self, unary_stream_test_method, control):
self._test_method = unary_stream_test_method
self._control = control
def service(self, request, context):
response_consumer = _BufferingConsumer()
self._test_method.service(
request, response_consumer, context, self._control)
for response in response_consumer.consumed:
yield response
class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod):
def __init__(self, unary_stream_test_method, control, pool):
self._test_method = unary_stream_test_method
self._control = control
self._pool = pool
def service(self, request, response_consumer, context):
if self._pool is None:
self._test_method.service(
request, response_consumer, context, self._control)
else:
self._pool.submit(
self._test_method.service, request, response_consumer, context,
self._control)
class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
def __init__(self, stream_unary_test_method, control):
self._test_method = stream_unary_test_method
self._control = control
def service(self, request_iterator, context):
response_list = []
request_consumer = self._test_method.service(
response_list.append, context, self._control)
for request in request_iterator:
request_consumer.consume(request)
request_consumer.terminate()
return response_list.pop(0)
class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod):
def __init__(self, stream_unary_test_method, control, pool):
self._test_method = stream_unary_test_method
self._control = control
self._pool = pool
def service(self, response_callback, context):
request_consumer = self._test_method.service(
response_callback, context, self._control)
if self._pool is None:
return request_consumer
else:
return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
def __init__(self, stream_stream_test_method, control):
self._test_method = stream_stream_test_method
self._control = control
def service(self, request_iterator, context):
response_consumer = _BufferingConsumer()
request_consumer = self._test_method.service(
response_consumer, context, self._control)
for request in request_iterator:
request_consumer.consume(request)
while response_consumer.consumed:
yield response_consumer.consumed.pop(0)
response_consumer.terminate()
class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod):
def __init__(self, stream_stream_test_method, control, pool):
self._test_method = stream_stream_test_method
self._control = control
self._pool = pool
def service(self, response_consumer, context):
request_consumer = self._test_method.service(
response_consumer, context, self._control)
if self._pool is None:
return request_consumer
else:
return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
class _UnaryConsumer(stream.Consumer):
"""A Consumer that only allows consumption of exactly one value."""
def __init__(self, action):
self._lock = threading.Lock()
self._action = action
self._consumed = False
self._terminated = False
def consume(self, value):
with self._lock:
if self._consumed:
raise ValueError('Unary consumer already consumed!')
elif self._terminated:
raise ValueError('Unary consumer already terminated!')
else:
self._consumed = True
self._action(value)
def terminate(self):
with self._lock:
if not self._consumed:
raise ValueError('Unary consumer hasn\'t yet consumed!')
elif self._terminated:
raise ValueError('Unary consumer already terminated!')
else:
self._terminated = True
def consume_and_terminate(self, value):
with self._lock:
if self._consumed:
raise ValueError('Unary consumer already consumed!')
elif self._terminated:
raise ValueError('Unary consumer already terminated!')
else:
self._consumed = True
self._terminated = True
self._action(value)
class _UnaryUnaryAdaptation(object):
def __init__(self, unary_unary_test_method):
self._method = unary_unary_test_method
def service(self, response_consumer, context, control):
def action(request):
self._method.service(
request, response_consumer.consume_and_terminate, context, control)
return _UnaryConsumer(action)
class _UnaryStreamAdaptation(object):
def __init__(self, unary_stream_test_method):
self._method = unary_stream_test_method
def service(self, response_consumer, context, control):
def action(request):
self._method.service(request, response_consumer, context, control)
return _UnaryConsumer(action)
class _StreamUnaryAdaptation(object):
def __init__(self, stream_unary_test_method):
self._method = stream_unary_test_method
def service(self, response_consumer, context, control):
return self._method.service(
response_consumer.consume_and_terminate, context, control)
class _MultiMethod(face_interfaces.MultiMethod):
def __init__(self, methods, control, pool):
self._methods = methods
self._control = control
self._pool = pool
def service(self, name, response_consumer, context):
method = self._methods.get(name, None)
if method is None:
raise exceptions.NoSuchMethodError(name)
elif self._pool is None:
return method(response_consumer, context, self._control)
else:
request_consumer = method(response_consumer, context, self._control)
return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
class _Assembly(
collections.namedtuple(
'_Assembly',
['methods', 'inlines', 'events', 'adaptations', 'messages'])):
"""An intermediate structure created when creating a TestServiceDigest."""
def _assemble(
scenarios, names, inline_method_constructor, event_method_constructor,
adapter, control, pool):
"""Creates an _Assembly from the given scenarios."""
methods = []
inlines = {}
events = {}
adaptations = {}
messages = {}
for name, scenario in scenarios.iteritems():
if name in names:
raise ValueError('Repeated name "%s"!' % name)
test_method = scenario[0]
inline_method = inline_method_constructor(test_method, control)
event_method = event_method_constructor(test_method, control, pool)
adaptation = adapter(test_method)
methods.append(test_method)
inlines[name] = inline_method
events[name] = event_method
adaptations[name] = adaptation
messages[name] = scenario[1]
return _Assembly(methods, inlines, events, adaptations, messages)
def digest(service, control, pool):
"""Creates a TestServiceDigest from a TestService.
Args:
service: A testing_service.TestService.
control: A testing_control.Control.
pool: If RPC methods should be serviced in a separate thread, a thread pool.
None if RPC methods should be serviced in the thread belonging to the
run-time that calls for their service.
Returns:
A TestServiceDigest synthesized from the given service.TestService.
"""
names = set()
unary_unary = _assemble(
service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod,
_EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
names.update(set(unary_unary.inlines))
unary_stream = _assemble(
service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod,
_EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
names.update(set(unary_stream.inlines))
stream_unary = _assemble(
service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod,
_EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
names.update(set(stream_unary.inlines))
stream_stream = _assemble(
service.stream_stream_scenarios(), names, _InlineStreamStreamMethod,
_EventStreamStreamMethod, _IDENTITY, control, pool)
names.update(set(stream_stream.inlines))
methods = list(unary_unary.methods)
methods.extend(unary_stream.methods)
methods.extend(stream_unary.methods)
methods.extend(stream_stream.methods)
adaptations = dict(unary_unary.adaptations)
adaptations.update(unary_stream.adaptations)
adaptations.update(stream_unary.adaptations)
adaptations.update(stream_stream.adaptations)
return TestServiceDigest(
service.name(),
methods,
unary_unary.inlines,
unary_stream.inlines,
stream_unary.inlines,
stream_stream.inlines,
unary_unary.events,
unary_stream.events,
stream_unary.events,
stream_stream.events,
_MultiMethod(adaptations, control, pool),
unary_unary.messages,
unary_stream.messages,
stream_unary.messages,
stream_stream.messages)

@ -0,0 +1,367 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test to verify an implementation of the Face layer of RPC Framework."""
import abc
import unittest
from _framework.face import interfaces
from _framework.face.testing import callback as testing_callback
from _framework.face.testing import control
from _framework.face.testing import coverage
from _framework.face.testing import digest
from _framework.face.testing import stock_service
from _framework.face.testing import test_case
_TIMEOUT = 3
class EventInvocationSynchronousEventServiceTestCase(
test_case.FaceTestCase, coverage.FullCoverage):
"""A test of the Face layer of RPC Framework.
Concrete subclasses must also extend unittest.TestCase.
"""
__metaclass__ = abc.ABCMeta
def setUp(self):
"""See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
self.control = control.PauseFailControl()
self.digest = digest.digest(
stock_service.STOCK_TEST_SERVICE, self.control, None)
self.server, self.stub, self.memo = self.set_up_implementation(
self.digest.name, self.digest.methods,
{}, {}, {}, {},
self.digest.event_unary_unary_methods,
self.digest.event_unary_stream_methods,
self.digest.event_stream_unary_methods,
self.digest.event_stream_stream_methods,
None)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
self.tear_down_implementation(self.memo)
def testSuccessfulUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
response = callback.response()
test_messages.verify(request, response, self)
def testSuccessfulUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
responses = callback.responses()
test_messages.verify(request, responses, self)
def testSuccessfulStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
response = callback.response()
test_messages.verify(requests, response, self)
def testSuccessfulStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
responses = callback.responses()
test_messages.verify(requests, responses, self)
def testSequentialInvocations(self):
# pylint: disable=cell-var-from-loop
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_callback = testing_callback.Callback()
second_callback = testing_callback.Callback()
def make_second_invocation(first_response):
first_callback.complete(first_response)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
second_callback.abort, _TIMEOUT)
self.stub.event_value_in_value_out(
name, first_request, make_second_invocation, first_callback.abort,
_TIMEOUT)
second_callback.block_until_terminated()
first_response = first_callback.response()
second_response = second_callback.response()
test_messages.verify(first_request, first_response, self)
test_messages.verify(second_request, second_response, self)
def testExpiredUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
with self.control.pause():
self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
with self.control.pause():
self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
def testExpiredStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for unused_test_messages in test_messages_sequence:
callback = testing_callback.Callback()
self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
for request in requests:
request_consumer.consume(request)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
def testFailedUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
with self.control.fail():
self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
with self.control.fail():
self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = testing_callback.Callback()
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = testing_callback.Callback()
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
def testParallelInvocations(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
first_callback = testing_callback.Callback()
second_request = test_messages.request()
second_callback = testing_callback.Callback()
self.stub.event_value_in_value_out(
name, first_request, first_callback.complete, first_callback.abort,
_TIMEOUT)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
second_callback.abort, _TIMEOUT)
first_callback.block_until_terminated()
second_callback.block_until_terminated()
first_response = first_callback.response()
second_response = second_callback.response()
test_messages.verify(first_request, first_response, self)
test_messages.verify(second_request, second_response, self)
@unittest.skip('TODO(nathaniel): implement.')
def testWaitingForSomeButNotAllParallelInvocations(self):
raise NotImplementedError()
def testCancelledUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
with self.control.pause():
call = self.stub.event_value_in_value_out(
name, request, callback.complete, callback.abort, _TIMEOUT)
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
def testCancelledUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = testing_callback.Callback()
call = self.stub.event_value_in_stream_out(
name, request, callback, callback.abort, _TIMEOUT)
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
def testCancelledStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = testing_callback.Callback()
call, request_consumer = self.stub.event_stream_in_value_out(
name, callback.complete, callback.abort, _TIMEOUT)
for request in requests:
request_consumer.consume(request)
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
def testCancelledStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for unused_test_messages in test_messages_sequence:
callback = testing_callback.Callback()
call, unused_request_consumer = self.stub.event_stream_in_stream_out(
name, callback, callback.abort, _TIMEOUT)
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())

@ -0,0 +1,377 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test to verify an implementation of the Face layer of RPC Framework."""
import abc
import contextlib
import threading
import unittest
from _framework.face import exceptions
from _framework.face.testing import control
from _framework.face.testing import coverage
from _framework.face.testing import digest
from _framework.face.testing import stock_service
from _framework.face.testing import test_case
from _framework.foundation import future
from _framework.foundation import logging_pool
_TIMEOUT = 3
_MAXIMUM_POOL_SIZE = 100
class _PauseableIterator(object):
def __init__(self, upstream):
self._upstream = upstream
self._condition = threading.Condition()
self._paused = False
@contextlib.contextmanager
def pause(self):
with self._condition:
self._paused = True
yield
with self._condition:
self._paused = False
self._condition.notify_all()
def __iter__(self):
return self
def next(self):
with self._condition:
while self._paused:
self._condition.wait()
return next(self._upstream)
class FutureInvocationAsynchronousEventServiceTestCase(
test_case.FaceTestCase, coverage.FullCoverage):
"""A test of the Face layer of RPC Framework.
Concrete subclasses must also extend unittest.TestCase.
"""
__metaclass__ = abc.ABCMeta
def setUp(self):
"""See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
self.control = control.PauseFailControl()
self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
self.digest = digest.digest(
stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool)
self.server, self.stub, self.memo = self.set_up_implementation(
self.digest.name, self.digest.methods,
{}, {}, {}, {},
self.digest.event_unary_unary_methods,
self.digest.event_unary_stream_methods,
self.digest.event_stream_unary_methods,
self.digest.event_stream_stream_methods,
None)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
self.tear_down_implementation(self.memo)
self.digest_pool.shutdown(wait=True)
def testSuccessfulUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
response = response_future.outcome().return_value
test_messages.verify(request, response, self)
def testSuccessfulUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
def testSuccessfulStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
request_iterator = _PauseableIterator(iter(requests))
# Use of a paused iterator of requests allows us to test that control is
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_future = self.stub.future_stream_in_value_out(
name, request_iterator, _TIMEOUT)
response = response_future.outcome().return_value
test_messages.verify(requests, response, self)
def testSuccessfulStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
request_iterator = _PauseableIterator(iter(requests))
# Use of a paused iterator of requests allows us to test that control is
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
name, request_iterator, _TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
def testSequentialInvocations(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_response_future = self.stub.future_value_in_value_out(
name, first_request, _TIMEOUT)
first_response = first_response_future.outcome().return_value
test_messages.verify(first_request, first_response, self)
second_response_future = self.stub.future_value_in_value_out(
name, second_request, _TIMEOUT)
second_response = second_response_future.outcome().return_value
test_messages.verify(second_request, second_response, self)
def testExpiredUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.pause():
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
outcome = response_future.outcome()
self.assertIsInstance(
outcome.exception, exceptions.ExpirationError)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.pause():
response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT)
outcome = response_future.outcome()
self.assertIsInstance(
outcome.exception, exceptions.ExpirationError)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.fail():
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
outcome = response_future.outcome()
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable
# from simply not having called its response_callback before the
# expiration of the RPC.
self.assertIsInstance(outcome.exception, exceptions.ExpirationError)
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable
# from simply not having called its response_consumer before the
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.fail():
response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT)
outcome = response_future.outcome()
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable
# from simply not having called its response_callback before the
# expiration of the RPC.
self.assertIsInstance(outcome.exception, exceptions.ExpirationError)
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable
# from simply not having called its response_consumer before the
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
list(response_iterator)
def testParallelInvocations(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_response_future = self.stub.future_value_in_value_out(
name, first_request, _TIMEOUT)
second_response_future = self.stub.future_value_in_value_out(
name, second_request, _TIMEOUT)
first_response = first_response_future.outcome().return_value
second_response = second_response_future.outcome().return_value
test_messages.verify(first_request, first_response, self)
test_messages.verify(second_request, second_response, self)
@unittest.skip('TODO(nathaniel): implement.')
def testWaitingForSomeButNotAllParallelInvocations(self):
raise NotImplementedError()
def testCancelledUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.pause():
response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT)
cancelled = response_future.cancel()
self.assertFalse(cancelled)
self.assertEqual(future.ABORTED, response_future.outcome().category)
def testCancelledUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.unary_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
response_iterator.cancel()
with self.assertRaises(exceptions.CancellationError):
next(response_iterator)
def testCancelledStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
self.digest.stream_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.pause():
response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT)
cancelled = response_future.cancel()
self.assertFalse(cancelled)
self.assertEqual(future.ABORTED, response_future.outcome().category)
def testCancelledStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
self.digest.stream_stream_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
response_iterator.cancel()
with self.assertRaises(exceptions.CancellationError):
next(response_iterator)

@ -0,0 +1,117 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces implemented by data sets used in Face-layer tests."""
import abc
# cardinality is referenced from specification in this module.
from _framework.common import cardinality # pylint: disable=unused-import
class Method(object):
"""An RPC method to be used in tests of RPC implementations."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def name(self):
"""Identify the name of the method.
Returns:
The name of the method.
"""
raise NotImplementedError()
@abc.abstractmethod
def cardinality(self):
"""Identify the cardinality of the method.
Returns:
A cardinality.Cardinality value describing the streaming semantics of the
method.
"""
raise NotImplementedError()
@abc.abstractmethod
def request_class(self):
"""Identify the class used for the method's request objects.
Returns:
The class object of the class to which the method's request objects
belong.
"""
raise NotImplementedError()
@abc.abstractmethod
def response_class(self):
"""Identify the class used for the method's response objects.
Returns:
The class object of the class to which the method's response objects
belong.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_request(self, request):
"""Serialize the given request object.
Args:
request: A request object appropriate for this method.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_request(self, serialized_request):
"""Synthesize a request object from a given bytestring.
Args:
serialized_request: A bytestring deserializable into a request object
appropriate for this method.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_response(self, response):
"""Serialize the given response object.
Args:
response: A response object appropriate for this method.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_response(self, serialized_response):
"""Synthesize a response object from a given bytestring.
Args:
serialized_response: A bytestring deserializable into a response object
appropriate for this method.
"""
raise NotImplementedError()

@ -0,0 +1,70 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utility for serialization in the context of test RPC services."""
import collections
class Serialization(
collections.namedtuple(
'_Serialization',
['request_serializers',
'request_deserializers',
'response_serializers',
'response_deserializers'])):
"""An aggregation of serialization behaviors for an RPC service.
Attributes:
request_serializers: A dict from method name to request object serializer
behavior.
request_deserializers: A dict from method name to request object
deserializer behavior.
response_serializers: A dict from method name to response object serializer
behavior.
response_deserializers: A dict from method name to response object
deserializer behavior.
"""
def serialization(methods):
"""Creates a Serialization from a sequences of interfaces.Method objects."""
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for method in methods:
name = method.name()
request_serializers[name] = method.serialize_request
request_deserializers[name] = method.deserialize_request
response_serializers[name] = method.serialize_response
response_deserializers[name] = method.deserialize_response
return Serialization(
request_serializers, request_deserializers, response_serializers,
response_deserializers)

@ -0,0 +1,337 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Private interfaces implemented by data sets used in Face-layer tests."""
import abc
# interfaces is referenced from specification in this module.
from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import
from _framework.face.testing import interfaces
class UnaryUnaryTestMethod(interfaces.Method):
"""Like face_interfaces.EventValueInValueOutMethod but with a control."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, response_callback, context, control):
"""Services an RPC that accepts one message and produces one message.
Args:
request: The single request message for the RPC.
response_callback: A callback to be called to accept the response message
of the RPC.
context: An face_interfaces.RpcContext object.
control: A test_control.Control to control execution of this method.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class UnaryUnaryTestMessages(object):
"""A type for unary-request-unary-response message pairings."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def request(self):
"""Affords a request message.
Implementations of this method should return a different message with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A request message.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, request, response, test_case):
"""Verifies that the computed response matches the given request.
Args:
request: A request message.
response: A response message.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the request and response do not match, indicating that
there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class UnaryStreamTestMethod(interfaces.Method):
"""Like face_interfaces.EventValueInStreamOutMethod but with a control."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, request, response_consumer, context, control):
"""Services an RPC that takes one message and produces a stream of messages.
Args:
request: The single request message for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
messages of the RPC.
context: An RpcContext object.
control: A test_control.Control to control execution of this method.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class UnaryStreamTestMessages(object):
"""A type for unary-request-stream-response message pairings."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def request(self):
"""Affords a request message.
Implementations of this method should return a different message with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A request message.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, request, responses, test_case):
"""Verifies that the computed responses match the given request.
Args:
request: A request message.
responses: A sequence of response messages.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the request and responses do not match, indicating that
there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class StreamUnaryTestMethod(interfaces.Method):
"""Like face_interfaces.EventStreamInValueOutMethod but with a control."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, response_callback, context, control):
"""Services an RPC that takes a stream of messages and produces one message.
Args:
response_callback: A callback to be called to accept the response message
of the RPC.
context: An RpcContext object.
control: A test_control.Control to control execution of this method.
Returns:
A stream.Consumer with which to accept the request messages of the RPC.
The consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing messages to this object. Implementations must not assume that
this object will be called to completion of the request stream or even
called at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class StreamUnaryTestMessages(object):
"""A type for stream-request-unary-response message pairings."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def requests(self):
"""Affords a sequence of request messages.
Implementations of this method should return a different sequences with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A sequence of request messages.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, requests, response, test_case):
"""Verifies that the computed response matches the given requests.
Args:
requests: A sequence of request messages.
response: A response message.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the requests and response do not match, indicating that
there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class StreamStreamTestMethod(interfaces.Method):
"""Like face_interfaces.EventStreamInStreamOutMethod but with a control."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, response_consumer, context, control):
"""Services an RPC that accepts and produces streams of messages.
Args:
response_consumer: A stream.Consumer to be called to accept the response
messages of the RPC.
context: An RpcContext object.
control: A test_control.Control to control execution of this method.
Returns:
A stream.Consumer with which to accept the request messages of the RPC.
The consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing messages to this object. Implementations must not assume that
this object will be called to completion of the request stream or even
called at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class StreamStreamTestMessages(object):
"""A type for stream-request-stream-response message pairings."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def requests(self):
"""Affords a sequence of request messages.
Implementations of this method should return a different sequences with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A sequence of request messages.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, requests, responses, test_case):
"""Verifies that the computed response matches the given requests.
Args:
requests: A sequence of request messages.
responses: A sequence of response messages.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the requests and responses do not match, indicating
that there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class TestService(object):
"""A specification of implemented RPC methods to use in tests."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def name(self):
"""Identifies the RPC service name used during the test.
Returns:
The RPC service name to be used for the test.
"""
raise NotImplementedError()
@abc.abstractmethod
def unary_unary_scenarios(self):
"""Affords unary-request-unary-response test methods and their messages.
Returns:
A dict from method name to pair. The first element of the pair
is a UnaryUnaryTestMethod object and the second element is a sequence
of UnaryUnaryTestMethodMessages objects.
"""
raise NotImplementedError()
@abc.abstractmethod
def unary_stream_scenarios(self):
"""Affords unary-request-stream-response test methods and their messages.
Returns:
A dict from method name to pair. The first element of the pair is a
UnaryStreamTestMethod object and the second element is a sequence of
UnaryStreamTestMethodMessages objects.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_unary_scenarios(self):
"""Affords stream-request-unary-response test methods and their messages.
Returns:
A dict from method name to pair. The first element of the pair is a
StreamUnaryTestMethod object and the second element is a sequence of
StreamUnaryTestMethodMessages objects.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_stream_scenarios(self):
"""Affords stream-request-stream-response test methods and their messages.
Returns:
A dict from method name to pair. The first element of the pair is a
StreamStreamTestMethod object and the second element is a sequence of
StreamStreamTestMethodMessages objects.
"""
raise NotImplementedError()

@ -0,0 +1,374 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Examples of Python implementations of the stock.proto Stock service."""
from _framework.common import cardinality
from _framework.face.testing import service
from _framework.foundation import abandonment
from _framework.foundation import stream
from _framework.foundation import stream_util
from _junkdrawer import stock_pb2
SYMBOL_FORMAT = 'test symbol:%03d'
STREAM_LENGTH = 400
# A test-appropriate security-pricing function. :-P
_price = lambda symbol_name: float(hash(symbol_name) % 4096)
def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
"""A unary-request, unary-response test method."""
control.control()
if active():
stock_reply_callback(
stock_pb2.StockReply(
symbol=stock_request.symbol, price=_price(stock_request.symbol)))
else:
raise abandonment.Abandoned()
def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
"""A stream-request, stream-response test method."""
def stock_reply_for_stock_request(stock_request):
control.control()
if active():
return stock_pb2.StockReply(
symbol=stock_request.symbol, price=_price(stock_request.symbol))
else:
raise abandonment.Abandoned()
return stream_util.TransformingConsumer(
stock_reply_for_stock_request, stock_reply_consumer)
def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
"""A unary-request, stream-response test method."""
base_price = _price(stock_request.symbol)
for index in range(stock_request.num_trades_to_watch):
control.control()
if active():
stock_reply_consumer.consume(
stock_pb2.StockReply(
symbol=stock_request.symbol, price=base_price + index))
else:
raise abandonment.Abandoned()
stock_reply_consumer.terminate()
def _get_highest_trade_price(stock_reply_callback, control, active):
"""A stream-request, unary-response test method."""
class StockRequestConsumer(stream.Consumer):
"""Keeps an ongoing record of the most valuable symbol yet consumed."""
def __init__(self):
self._symbol = None
self._price = None
def consume(self, stock_request):
control.control()
if active():
if self._price is None:
self._symbol = stock_request.symbol
self._price = _price(stock_request.symbol)
else:
candidate_price = _price(stock_request.symbol)
if self._price < candidate_price:
self._symbol = stock_request.symbol
self._price = candidate_price
def terminate(self):
control.control()
if active():
if self._symbol is None:
raise ValueError()
else:
stock_reply_callback(
stock_pb2.StockReply(symbol=self._symbol, price=self._price))
self._symbol = None
self._price = None
def consume_and_terminate(self, stock_request):
control.control()
if active():
if self._price is None:
stock_reply_callback(
stock_pb2.StockReply(
symbol=stock_request.symbol,
price=_price(stock_request.symbol)))
else:
candidate_price = _price(stock_request.symbol)
if self._price < candidate_price:
stock_reply_callback(
stock_pb2.StockReply(
symbol=stock_request.symbol, price=candidate_price))
else:
stock_reply_callback(
stock_pb2.StockReply(
symbol=self._symbol, price=self._price))
self._symbol = None
self._price = None
return StockRequestConsumer()
class GetLastTradePrice(service.UnaryUnaryTestMethod):
"""GetLastTradePrice for use in tests."""
def name(self):
return 'GetLastTradePrice'
def cardinality(self):
return cardinality.Cardinality.UNARY_UNARY
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, request, response_callback, context, control):
_get_last_trade_price(
request, response_callback, control, context.is_active)
class GetLastTradePriceMessages(service.UnaryUnaryTestMessages):
def __init__(self):
self._index = 0
def request(self):
symbol = SYMBOL_FORMAT % self._index
self._index += 1
return stock_pb2.StockRequest(symbol=symbol)
def verify(self, request, response, test_case):
test_case.assertEqual(request.symbol, response.symbol)
test_case.assertEqual(_price(request.symbol), response.price)
class GetLastTradePriceMultiple(service.StreamStreamTestMethod):
"""GetLastTradePriceMultiple for use in tests."""
def name(self):
return 'GetLastTradePriceMultiple'
def cardinality(self):
return cardinality.Cardinality.STREAM_STREAM
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, response_consumer, context, control):
return _get_last_trade_price_multiple(
response_consumer, control, context.is_active)
class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages):
"""Pairs of message streams for use with GetLastTradePriceMultiple."""
def __init__(self):
self._index = 0
def requests(self):
base_index = self._index
self._index += 1
return [
stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index))
for index in range(STREAM_LENGTH)]
def verify(self, requests, responses, test_case):
test_case.assertEqual(len(requests), len(responses))
for stock_request, stock_reply in zip(requests, responses):
test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
class WatchFutureTrades(service.UnaryStreamTestMethod):
"""WatchFutureTrades for use in tests."""
def name(self):
return 'WatchFutureTrades'
def cardinality(self):
return cardinality.Cardinality.UNARY_STREAM
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, request, response_consumer, context, control):
_watch_future_trades(request, response_consumer, control, context.is_active)
class WatchFutureTradesMessages(service.UnaryStreamTestMessages):
"""Pairs of a single request message and a sequence of response messages."""
def __init__(self):
self._index = 0
def request(self):
symbol = SYMBOL_FORMAT % self._index
self._index += 1
return stock_pb2.StockRequest(
symbol=symbol, num_trades_to_watch=STREAM_LENGTH)
def verify(self, request, responses, test_case):
test_case.assertEqual(STREAM_LENGTH, len(responses))
base_price = _price(request.symbol)
for index, response in enumerate(responses):
test_case.assertEqual(base_price + index, response.price)
class GetHighestTradePrice(service.StreamUnaryTestMethod):
"""GetHighestTradePrice for use in tests."""
def name(self):
return 'GetHighestTradePrice'
def cardinality(self):
return cardinality.Cardinality.STREAM_UNARY
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, response_callback, context, control):
return _get_highest_trade_price(
response_callback, control, context.is_active)
class GetHighestTradePriceMessages(service.StreamUnaryTestMessages):
def requests(self):
return [
stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index)
for index in range(STREAM_LENGTH)]
def verify(self, requests, response, test_case):
price = None
symbol = None
for stock_request in requests:
current_symbol = stock_request.symbol
current_price = _price(current_symbol)
if price is None or price < current_price:
price = current_price
symbol = current_symbol
test_case.assertEqual(price, response.price)
test_case.assertEqual(symbol, response.symbol)
class StockTestService(service.TestService):
"""A corpus of test data with one method of each RPC cardinality."""
def name(self):
return 'Stock'
def unary_unary_scenarios(self):
return {
'GetLastTradePrice': (
GetLastTradePrice(), [GetLastTradePriceMessages()]),
}
def unary_stream_scenarios(self):
return {
'WatchFutureTrades': (
WatchFutureTrades(), [WatchFutureTradesMessages()]),
}
def stream_unary_scenarios(self):
return {
'GetHighestTradePrice': (
GetHighestTradePrice(), [GetHighestTradePriceMessages()])
}
def stream_stream_scenarios(self):
return {
'GetLastTradePriceMultiple': (
GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
}
STOCK_TEST_SERVICE = StockTestService()

@ -0,0 +1,111 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tools for creating tests of implementations of the Face layer."""
import abc
# face_interfaces and interfaces are referenced in specification in this module.
from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import
from _framework.face.testing import interfaces # pylint: disable=unused-import
class FaceTestCase(object):
"""Describes a test of the Face Layer of RPC Framework.
Concrete subclasses must also inherit from unittest.TestCase and from at least
one class that defines test methods.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
"""Instantiates the Face Layer implementation under test.
Args:
name: The service name to be used in the test.
methods: A sequence of interfaces.Method objects describing the RPC
methods that will be called during the test.
inline_value_in_value_out_methods: A dictionary from string method names
to face_interfaces.InlineValueInValueOutMethod implementations of those
methods.
inline_value_in_stream_out_methods: A dictionary from string method names
to face_interfaces.InlineValueInStreamOutMethod implementations of those
methods.
inline_stream_in_value_out_methods: A dictionary from string method names
to face_interfaces.InlineStreamInValueOutMethod implementations of those
methods.
inline_stream_in_stream_out_methods: A dictionary from string method names
to face_interfaces.InlineStreamInStreamOutMethod implementations of
those methods.
event_value_in_value_out_methods: A dictionary from string method names
to face_interfaces.EventValueInValueOutMethod implementations of those
methods.
event_value_in_stream_out_methods: A dictionary from string method names
to face_interfaces.EventValueInStreamOutMethod implementations of those
methods.
event_stream_in_value_out_methods: A dictionary from string method names
to face_interfaces.EventStreamInValueOutMethod implementations of those
methods.
event_stream_in_stream_out_methods: A dictionary from string method names
to face_interfaces.EventStreamInStreamOutMethod implementations of those
methods.
multi_method: An face_interfaces.MultiMethod, or None.
Returns:
A sequence of length three the first element of which is a
face_interfaces.Server, the second element of which is a
face_interfaces.Stub, (both of which are backed by the given method
implementations), and the third element of which is an arbitrary memo
object to be kept and passed to tearDownImplementation at the conclusion
of the test.
"""
raise NotImplementedError()
@abc.abstractmethod
def tear_down_implementation(self, memo):
"""Destroys the Face layer implementation under test.
Args:
memo: The object from the third position of the return value of
set_up_implementation.
"""
raise NotImplementedError()

@ -0,0 +1,152 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): Remove this from source control after having made
# generation from the stock.proto source part of GRPC's build-and-test
# process.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: stock.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='stock.proto',
package='stock',
serialized_pb=_b('\n\x0bstock.proto\x12\x05stock\">\n\x0cStockRequest\x12\x0e\n\x06symbol\x18\x01 \x01(\t\x12\x1e\n\x13num_trades_to_watch\x18\x02 \x01(\x05:\x01\x30\"+\n\nStockReply\x12\r\n\x05price\x18\x01 \x01(\x02\x12\x0e\n\x06symbol\x18\x02 \x01(\t2\x96\x02\n\x05Stock\x12=\n\x11GetLastTradePrice\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00\x12I\n\x19GetLastTradePriceMultiple\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00(\x01\x30\x01\x12?\n\x11WatchFutureTrades\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00\x30\x01\x12\x42\n\x14GetHighestTradePrice\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00(\x01')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_STOCKREQUEST = _descriptor.Descriptor(
name='StockRequest',
full_name='stock.StockRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='symbol', full_name='stock.StockRequest.symbol', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='num_trades_to_watch', full_name='stock.StockRequest.num_trades_to_watch', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=True, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=22,
serialized_end=84,
)
_STOCKREPLY = _descriptor.Descriptor(
name='StockReply',
full_name='stock.StockReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='price', full_name='stock.StockReply.price', index=0,
number=1, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='symbol', full_name='stock.StockReply.symbol', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=86,
serialized_end=129,
)
DESCRIPTOR.message_types_by_name['StockRequest'] = _STOCKREQUEST
DESCRIPTOR.message_types_by_name['StockReply'] = _STOCKREPLY
StockRequest = _reflection.GeneratedProtocolMessageType('StockRequest', (_message.Message,), dict(
DESCRIPTOR = _STOCKREQUEST,
__module__ = 'stock_pb2'
# @@protoc_insertion_point(class_scope:stock.StockRequest)
))
_sym_db.RegisterMessage(StockRequest)
StockReply = _reflection.GeneratedProtocolMessageType('StockReply', (_message.Message,), dict(
DESCRIPTOR = _STOCKREPLY,
__module__ = 'stock_pb2'
# @@protoc_insertion_point(class_scope:stock.StockReply)
))
_sym_db.RegisterMessage(StockReply)
# @@protoc_insertion_point(module_scope)

@ -6,5 +6,6 @@ set -ex
cd $(dirname $0)/../..
root=`pwd`
python2.7_virtual_environment/bin/python2.7 -B -m unittest discover -s src/python -p '*.py'
python3.4 -B -m unittest discover -s src/python -p '*.py'
PYTHONPATH=third_party/protobuf/python python2.7_virtual_environment/bin/python2.7 -B -m unittest discover -s src/python -p '*.py'
# TODO(nathaniel): Get this working again (requires 3.X-friendly protobuf)
# python3.4 -B -m unittest discover -s src/python -p '*.py'

Loading…
Cancel
Save