Fill out the foundation package.

pull/193/head
Nathaniel Manista 10 years ago
parent fcd6c0c922
commit b68f3d1746
  1. 145
      src/python/_framework/foundation/_later_test.py
  2. 156
      src/python/_framework/foundation/_timer_future.py
  3. 38
      src/python/_framework/foundation/abandonment.py
  4. 78
      src/python/_framework/foundation/callable_util.py
  5. 172
      src/python/_framework/foundation/future.py
  6. 51
      src/python/_framework/foundation/later.py
  7. 60
      src/python/_framework/foundation/stream.py
  8. 73
      src/python/_framework/foundation/stream_testing.py
  9. 160
      src/python/_framework/foundation/stream_util.py

@ -0,0 +1,145 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the later module."""
import threading
import time
import unittest
from _framework.foundation import future
from _framework.foundation import later
TICK = 0.1
class LaterTest(unittest.TestCase):
def test_simple_delay(self):
lock = threading.Lock()
cell = [0]
def increment_cell():
with lock:
cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell)
self.assertFalse(computation_future.done())
self.assertFalse(computation_future.cancelled())
time.sleep(TICK)
self.assertFalse(computation_future.done())
self.assertFalse(computation_future.cancelled())
with lock:
self.assertEqual(0, cell[0])
time.sleep(TICK * 2)
self.assertTrue(computation_future.done())
self.assertFalse(computation_future.cancelled())
with lock:
self.assertEqual(1, cell[0])
outcome = computation_future.outcome()
self.assertEqual(future.RETURNED, outcome.category)
def test_callback(self):
lock = threading.Lock()
cell = [0]
callback_called = [False]
outcome_passed_to_callback = [None]
def increment_cell():
with lock:
cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell)
def callback(outcome):
with lock:
callback_called[0] = True
outcome_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback)
time.sleep(TICK)
with lock:
self.assertFalse(callback_called[0])
time.sleep(TICK * 2)
with lock:
self.assertTrue(callback_called[0])
self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category)
callback_called[0] = False
outcome_passed_to_callback[0] = None
computation_future.add_done_callback(callback)
with lock:
self.assertTrue(callback_called[0])
self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category)
def test_cancel(self):
lock = threading.Lock()
cell = [0]
callback_called = [False]
outcome_passed_to_callback = [None]
def increment_cell():
with lock:
cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell)
def callback(outcome):
with lock:
callback_called[0] = True
outcome_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback)
time.sleep(TICK)
with lock:
self.assertFalse(callback_called[0])
computation_future.cancel()
self.assertTrue(computation_future.cancelled())
self.assertFalse(computation_future.done())
self.assertEqual(future.ABORTED, computation_future.outcome().category)
with lock:
self.assertTrue(callback_called[0])
self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category)
def test_outcome(self):
lock = threading.Lock()
cell = [0]
callback_called = [False]
outcome_passed_to_callback = [None]
def increment_cell():
with lock:
cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell)
def callback(outcome):
with lock:
callback_called[0] = True
outcome_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback)
returned_outcome = computation_future.outcome()
self.assertEqual(future.RETURNED, returned_outcome.category)
# The callback may not yet have been called! Sleep a tick.
time.sleep(TICK)
with lock:
self.assertTrue(callback_called[0])
self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,156 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Affords a Future implementation based on Python's threading.Timer."""
import threading
import time
from _framework.foundation import future
class TimerFuture(future.Future):
"""A Future implementation based around Timer objects."""
def __init__(self, compute_time, computation):
"""Constructor.
Args:
compute_time: The time after which to begin this future's computation.
computation: The computation to be performed within this Future.
"""
self._lock = threading.Lock()
self._compute_time = compute_time
self._computation = computation
self._timer = None
self._computing = False
self._computed = False
self._cancelled = False
self._outcome = None
self._waiting = []
def _compute(self):
"""Performs the computation embedded in this Future.
Or doesn't, if the time to perform it has not yet arrived.
"""
with self._lock:
time_remaining = self._compute_time - time.time()
if 0 < time_remaining:
self._timer = threading.Timer(time_remaining, self._compute)
self._timer.start()
return
else:
self._computing = True
try:
returned_value = self._computation()
outcome = future.returned(returned_value)
except Exception as e: # pylint: disable=broad-except
outcome = future.raised(e)
with self._lock:
self._computing = False
self._computed = True
self._outcome = outcome
waiting = self._waiting
for callback in waiting:
callback(outcome)
def start(self):
"""Starts this Future.
This must be called exactly once, immediately after construction.
"""
with self._lock:
self._timer = threading.Timer(
self._compute_time - time.time(), self._compute)
self._timer.start()
def cancel(self):
"""See future.Future.cancel for specification."""
with self._lock:
if self._computing or self._computed:
return False
elif self._cancelled:
return True
else:
self._timer.cancel()
self._cancelled = True
self._outcome = future.aborted()
outcome = self._outcome
waiting = self._waiting
for callback in waiting:
try:
callback(outcome)
except Exception: # pylint: disable=broad-except
pass
return True
def cancelled(self):
"""See future.Future.cancelled for specification."""
with self._lock:
return self._cancelled
def done(self):
"""See future.Future.done for specification."""
with self._lock:
return self._computed
def outcome(self):
"""See future.Future.outcome for specification."""
with self._lock:
if self._computed or self._cancelled:
return self._outcome
condition = threading.Condition()
def notify_condition(unused_outcome):
with condition:
condition.notify()
self._waiting.append(notify_condition)
with condition:
condition.wait()
with self._lock:
return self._outcome
def add_done_callback(self, callback):
"""See future.Future.add_done_callback for specification."""
with self._lock:
if not self._computed and not self._cancelled:
self._waiting.append(callback)
return
else:
outcome = self._outcome
callback(outcome)

@ -0,0 +1,38 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for indicating abandonment of computation."""
class Abandoned(Exception):
"""Indicates that some computation is being abandoned.
Abandoning a computation is different than returning a value or raising
an exception indicating some operational or programming defect.
"""

@ -0,0 +1,78 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for working with callables."""
import functools
import logging
from _framework.foundation import future
def _call_logging_exceptions(behavior, message, *args, **kwargs):
try:
return future.returned(behavior(*args, **kwargs))
except Exception as e: # pylint: disable=broad-except
logging.exception(message)
return future.raised(e)
def with_exceptions_logged(behavior, message):
"""Wraps a callable in a try-except that logs any exceptions it raises.
Args:
behavior: Any callable.
message: A string to log if the behavior raises an exception.
Returns:
A callable that when executed invokes the given behavior. The returned
callable takes the same arguments as the given behavior but returns a
future.Outcome describing whether the given behavior returned a value or
raised an exception.
"""
@functools.wraps(behavior)
def wrapped_behavior(*args, **kwargs):
return _call_logging_exceptions(behavior, message, *args, **kwargs)
return wrapped_behavior
def call_logging_exceptions(behavior, message, *args, **kwargs):
"""Calls a behavior in a try-except that logs any exceptions it raises.
Args:
behavior: Any callable.
message: A string to log if the behavior raises an exception.
*args: Positional arguments to pass to the given behavior.
**kwargs: Keyword arguments to pass to the given behavior.
Returns:
A future.Outcome describing whether the given behavior returned a value or
raised an exception.
"""
return _call_logging_exceptions(behavior, message, *args, **kwargs)

@ -0,0 +1,172 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The Future interface missing from Python's standard library.
Python's concurrent.futures library defines a Future class very much like the
Future defined here, but since that class is concrete and without construction
semantics it is only available within the concurrent.futures library itself.
The Future class defined here is an entirely abstract interface that anyone may
implement and use.
"""
import abc
import collections
RETURNED = object()
RAISED = object()
ABORTED = object()
class Outcome(object):
"""A sum type describing the outcome of some computation.
Attributes:
category: One of RETURNED, RAISED, or ABORTED, respectively indicating
that the computation returned a value, raised an exception, or was
aborted.
return_value: The value returned by the computation. Must be present if
category is RETURNED.
exception: The exception raised by the computation. Must be present if
category is RAISED.
"""
__metaclass__ = abc.ABCMeta
class _EasyOutcome(
collections.namedtuple('_EasyOutcome',
['category', 'return_value', 'exception']),
Outcome):
"""A trivial implementation of Outcome."""
# All Outcomes describing abortion are indistinguishable so there might as well
# be only one.
_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None)
def aborted():
"""Returns an Outcome indicating that a computation was aborted.
Returns:
An Outcome indicating that a computation was aborted.
"""
return _ABORTED_OUTCOME
def raised(exception):
"""Returns an Outcome indicating that a computation raised an exception.
Args:
exception: The exception raised by the computation.
Returns:
An Outcome indicating that a computation raised the given exception.
"""
return _EasyOutcome(RAISED, None, exception)
def returned(value):
"""Returns an Outcome indicating that a computation returned a value.
Args:
value: The value returned by the computation.
Returns:
An Outcome indicating that a computation returned the given value.
"""
return _EasyOutcome(RETURNED, value, None)
class Future(object):
"""A representation of a computation happening in another control flow.
Computations represented by a Future may have already completed, may be
ongoing, or may be yet to be begun.
Computations represented by a Future are considered uninterruptable; once
started they will be allowed to terminate either by returning or raising
an exception.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cancel(self):
"""Attempts to cancel the computation.
Returns:
True if the computation will not be allowed to take place or False if
the computation has already taken place or is currently taking place.
"""
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Describes whether the computation was cancelled.
Returns:
True if the computation was cancelled and did not take place or False
if the computation took place, is taking place, or is scheduled to
take place in the future.
"""
raise NotImplementedError()
@abc.abstractmethod
def done(self):
"""Describes whether the computation has taken place.
Returns:
True if the computation took place; False otherwise.
"""
raise NotImplementedError()
@abc.abstractmethod
def outcome(self):
"""Accesses the outcome of the computation.
If the computation has not yet completed, this method blocks until it has.
Returns:
An Outcome describing the outcome of the computation.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_done_callback(self, callback):
"""Adds a function to be called at completion of the computation.
The callback will be passed an Outcome object describing the outcome of
the computation.
If the computation has already completed, the callback will be called
immediately.
Args:
callback: A callable taking an Outcome as its single parameter.
"""
raise NotImplementedError()

@ -0,0 +1,51 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Enables scheduling execution at a later time."""
import time
from _framework.foundation import _timer_future
def later(delay, computation):
"""Schedules later execution of a callable.
Args:
delay: Any numeric value. Represents the minimum length of time in seconds
to allow to pass before beginning the computation. No guarantees are made
about the maximum length of time that will pass.
computation: A callable that accepts no arguments.
Returns:
A Future representing the scheduled computation.
"""
timer_future = _timer_future.TimerFuture(time.time() + delay, computation)
timer_future.start()
return timer_future

@ -0,0 +1,60 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces related to streams of values or objects."""
import abc
class Consumer(object):
"""Interface for consumers of finite streams of values or objects."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def consume(self, value):
"""Accepts a value.
Args:
value: Any value accepted by this Consumer.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self):
"""Indicates to this Consumer that no more values will be supplied."""
raise NotImplementedError()
@abc.abstractmethod
def consume_and_terminate(self, value):
"""Supplies a value and signals that no more values will be supplied.
Args:
value: Any value accepted by this Consumer.
"""
raise NotImplementedError()

@ -0,0 +1,73 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for testing stream-related code."""
from _framework.foundation import stream
class TestConsumer(stream.Consumer):
"""A stream.Consumer instrumented for testing.
Attributes:
calls: A sequence of value-termination pairs describing the history of calls
made on this object.
"""
def __init__(self):
self.calls = []
def consume(self, value):
"""See stream.Consumer.consume for specification."""
self.calls.append((value, False))
def terminate(self):
"""See stream.Consumer.terminate for specification."""
self.calls.append((None, True))
def consume_and_terminate(self, value):
"""See stream.Consumer.consume_and_terminate for specification."""
self.calls.append((value, True))
def is_legal(self):
"""Reports whether or not a legal sequence of calls has been made."""
terminated = False
for value, terminal in self.calls:
if terminated:
return False
elif terminal:
terminated = True
elif value is None:
return False
else: # pylint: disable=useless-else-on-loop
return True
def values(self):
"""Returns the sequence of values that have been passed to this Consumer."""
return [value for value, _ in self.calls if value]

@ -0,0 +1,160 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Helpful utilities related to the stream module."""
import logging
import threading
from _framework.foundation import stream
_NO_VALUE = object()
class TransformingConsumer(stream.Consumer):
"""A stream.Consumer that passes a transformation of its input to another."""
def __init__(self, transformation, downstream):
self._transformation = transformation
self._downstream = downstream
def consume(self, value):
self._downstream.consume(self._transformation(value))
def terminate(self):
self._downstream.terminate()
def consume_and_terminate(self, value):
self._downstream.consume_and_terminate(self._transformation(value))
class IterableConsumer(stream.Consumer):
"""A Consumer that when iterated over emits the values it has consumed."""
def __init__(self):
self._condition = threading.Condition()
self._values = []
self._active = True
def consume(self, stock_reply):
with self._condition:
if self._active:
self._values.append(stock_reply)
self._condition.notify()
def terminate(self):
with self._condition:
self._active = False
self._condition.notify()
def consume_and_terminate(self, stock_reply):
with self._condition:
if self._active:
self._values.append(stock_reply)
self._active = False
self._condition.notify()
def __iter__(self):
return self
def next(self):
with self._condition:
while self._active and not self._values:
self._condition.wait()
if self._values:
return self._values.pop(0)
else:
raise StopIteration()
class ThreadSwitchingConsumer(stream.Consumer):
"""A Consumer decorator that affords serialization and asynchrony."""
def __init__(self, sink, pool):
self._lock = threading.Lock()
self._sink = sink
self._pool = pool
# True if self._spin has been submitted to the pool to be called once and
# that call has not yet returned, False otherwise.
self._spinning = False
self._values = []
self._active = True
def _spin(self, sink, value, terminate):
while True:
try:
if value is _NO_VALUE:
sink.terminate()
elif terminate:
sink.consume_and_terminate(value)
else:
sink.consume(value)
except Exception as e: # pylint:disable=broad-except
logging.exception(e)
with self._lock:
if terminate:
self._spinning = False
return
elif self._values:
value = self._values.pop(0)
terminate = not self._values and not self._active
elif not self._active:
value = _NO_VALUE
terminate = True
else:
self._spinning = False
return
def consume(self, value):
with self._lock:
if self._active:
if self._spinning:
self._values.append(value)
else:
self._pool.submit(self._spin, self._sink, value, False)
self._spinning = True
def terminate(self):
with self._lock:
if self._active:
self._active = False
if not self._spinning:
self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
self._spinning = True
def consume_and_terminate(self, value):
with self._lock:
if self._active:
self._active = False
if self._spinning:
self._values.append(value)
else:
self._pool.submit(self._spin, self._sink, value, True)
self._spinning = True
Loading…
Cancel
Save