Add the _framework.base package.

pull/193/head
Nathaniel Manista 10 years ago
parent 61b5a81a1e
commit 21f29c6bf7
  1. 0
      src/python/_framework/base/__init__.py
  2. 34
      src/python/_framework/base/exceptions.py
  3. 229
      src/python/_framework/base/interfaces.py
  4. 299
      src/python/_framework/base/interfaces_test.py
  5. 0
      src/python/_framework/base/packets/__init__.py
  6. 64
      src/python/_framework/base/packets/_cancellation.py
  7. 32
      src/python/_framework/base/packets/_constants.py
  8. 99
      src/python/_framework/base/packets/_context.py
  9. 126
      src/python/_framework/base/packets/_emission.py
  10. 408
      src/python/_framework/base/packets/_ends.py
  11. 158
      src/python/_framework/base/packets/_expiration.py
  12. 440
      src/python/_framework/base/packets/_ingestion.py
  13. 269
      src/python/_framework/base/packets/_interfaces.py
  14. 394
      src/python/_framework/base/packets/_reception.py
  15. 201
      src/python/_framework/base/packets/_termination.py
  16. 393
      src/python/_framework/base/packets/_transmission.py
  17. 77
      src/python/_framework/base/packets/implementations.py
  18. 80
      src/python/_framework/base/packets/implementations_test.py
  19. 108
      src/python/_framework/base/packets/in_memory.py
  20. 84
      src/python/_framework/base/packets/interfaces.py
  21. 56
      src/python/_framework/base/packets/null.py
  22. 112
      src/python/_framework/base/packets/packets.py
  23. 91
      src/python/_framework/base/util.py

@ -0,0 +1,34 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Exceptions defined and used by the base layer of RPC Framework."""
class NoSuchMethodError(Exception):
"""Indicates that an operation with an unrecognized name has been called."""

@ -0,0 +1,229 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces defined and used by the base layer of RPC Framework."""
# TODO(nathaniel): Use Python's new enum library for enumerated types rather
# than constants merely placed close together.
import abc
# stream is referenced from specification in this module.
from _framework.foundation import stream # pylint: disable=unused-import
# Operation outcomes.
COMPLETED = 'completed'
CANCELLED = 'cancelled'
EXPIRED = 'expired'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
SERVICER_FAILURE = 'servicer failure'
SERVICED_FAILURE = 'serviced failure'
# Subscription categories.
FULL = 'full'
TERMINATION_ONLY = 'termination only'
NONE = 'none'
class OperationContext(object):
"""Provides operation-related information and action.
Attributes:
trace_id: A uuid.UUID identifying a particular set of related operations.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def is_active(self):
"""Describes whether the operation is active or has terminated."""
raise NotImplementedError()
@abc.abstractmethod
def add_termination_callback(self, callback):
"""Adds a function to be called upon operation termination.
Args:
callback: A callable that will be passed one of COMPLETED, CANCELLED,
EXPIRED, RECEPTION_FAILURE, TRANSMISSION_FAILURE, SERVICER_FAILURE, or
SERVICED_FAILURE.
"""
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the operation.
Returns:
A nonnegative float indicating the length of allowed time in seconds
remaining for the operation to complete before it is considered to have
timed out.
"""
raise NotImplementedError()
@abc.abstractmethod
def fail(self, exception):
"""Indicates that the operation has failed.
Args:
exception: An exception germane to the operation failure. May be None.
"""
raise NotImplementedError()
class Servicer(object):
"""Interface for service implementations."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def service(self, name, context, output_consumer):
"""Services an operation.
Args:
name: The name of the operation.
context: A ServicerContext object affording contextual information and
actions.
output_consumer: A stream.Consumer that will accept output values of
the operation.
Returns:
A stream.Consumer that will accept input values for the operation.
Raises:
exceptions.NoSuchMethodError: If this Servicer affords no method with the
given name.
abandonment.Abandoned: If the operation has been aborted and there no
longer is any reason to service the operation.
"""
raise NotImplementedError()
class Operation(object):
"""Representation of an in-progress operation.
Attributes:
consumer: A stream.Consumer into which payloads constituting the operation's
input may be passed.
context: An OperationContext affording information and action about the
operation.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cancel(self):
"""Cancels this operation."""
raise NotImplementedError()
class ServicedIngestor(object):
"""Responsible for accepting the result of an operation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def consumer(self, operation_context):
"""Affords a consumer to which operation results will be passed.
Args:
operation_context: An OperationContext object for the current operation.
Returns:
A stream.Consumer to which the results of the current operation will be
passed.
Raises:
abandonment.Abandoned: If the operation has been aborted and there no
longer is any reason to service the operation.
"""
raise NotImplementedError()
class ServicedSubscription(object):
"""A sum type representing a serviced's interest in an operation.
Attributes:
category: One of FULL, TERMINATION_ONLY, or NONE.
ingestor: A ServicedIngestor. Must be present if category is FULL.
"""
__metaclass__ = abc.ABCMeta
class End(object):
"""Common type for entry-point objects on both sides of an operation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def operation_stats(self):
"""Reports the number of terminated operations broken down by outcome.
Returns:
A dictionary from operation outcome constant (COMPLETED, CANCELLED,
EXPIRED, and so on) to an integer representing the number of operations
that terminated with that outcome.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_idle_action(self, action):
"""Adds an action to be called when this End has no ongoing operations.
Args:
action: A callable that accepts no arguments.
"""
raise NotImplementedError()
class Front(End):
"""Clientish objects that afford the invocation of operations."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def operate(
self, name, payload, complete, timeout, subscription, trace_id):
"""Commences an operation.
Args:
name: The name of the method invoked for the operation.
payload: An initial payload for the operation. May be None.
complete: A boolean indicating whether or not additional payloads to be
sent to the servicer may be supplied after this call.
timeout: A length of time in seconds to allow for the operation.
subscription: A ServicedSubscription for the operation.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
Returns:
An Operation object affording information and action about the operation
in progress.
"""
raise NotImplementedError()
class Back(End):
"""Serverish objects that perform the work of operations."""
__metaclass__ = abc.ABCMeta

@ -0,0 +1,299 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Abstract tests against the interfaces of the base layer of RPC Framework."""
import threading
import time
from _framework.base import interfaces
from _framework.base import util
from _framework.foundation import stream
from _framework.foundation import stream_testing
from _framework.foundation import stream_util
TICK = 0.1
SMALL_TIMEOUT = TICK * 50
STREAM_LENGTH = 100
SYNCHRONOUS_ECHO = 'synchronous echo'
ASYNCHRONOUS_ECHO = 'asynchronous echo'
IMMEDIATE_FAILURE = 'immediate failure'
TRIGGERED_FAILURE = 'triggered failure'
WAIT_ON_CONDITION = 'wait on condition'
EMPTY_OUTCOME_DICT = {
interfaces.COMPLETED: 0,
interfaces.CANCELLED: 0,
interfaces.EXPIRED: 0,
interfaces.RECEPTION_FAILURE: 0,
interfaces.TRANSMISSION_FAILURE: 0,
interfaces.SERVICER_FAILURE: 0,
interfaces.SERVICED_FAILURE: 0,
}
def _synchronous_echo(output_consumer):
return stream_util.TransformingConsumer(lambda x: x, output_consumer)
class AsynchronousEcho(stream.Consumer):
"""A stream.Consumer that echoes its input to another stream.Consumer."""
def __init__(self, output_consumer, pool):
self._lock = threading.Lock()
self._output_consumer = output_consumer
self._pool = pool
self._queue = []
self._spinning = False
def _spin(self, value, complete):
while True:
if value:
if complete:
self._output_consumer.consume_and_terminate(value)
else:
self._output_consumer.consume(value)
elif complete:
self._output_consumer.terminate()
with self._lock:
if self._queue:
value, complete = self._queue.pop(0)
else:
self._spinning = False
return
def consume(self, value):
with self._lock:
if self._spinning:
self._queue.append((value, False))
else:
self._spinning = True
self._pool.submit(self._spin, value, False)
def terminate(self):
with self._lock:
if self._spinning:
self._queue.append((None, True))
else:
self._spinning = True
self._pool.submit(self._spin, None, True)
def consume_and_terminate(self, value):
with self._lock:
if self._spinning:
self._queue.append((value, True))
else:
self._spinning = True
self._pool.submit(self._spin, value, True)
class TestServicer(interfaces.Servicer):
"""An interfaces.Servicer with instrumented for testing."""
def __init__(self, pool):
self._pool = pool
self.condition = threading.Condition()
self._released = False
def service(self, name, context, output_consumer):
if name == SYNCHRONOUS_ECHO:
return _synchronous_echo(output_consumer)
elif name == ASYNCHRONOUS_ECHO:
return AsynchronousEcho(output_consumer, self._pool)
elif name == IMMEDIATE_FAILURE:
raise ValueError()
elif name == TRIGGERED_FAILURE:
raise NotImplementedError
elif name == WAIT_ON_CONDITION:
with self.condition:
while not self._released:
self.condition.wait()
return _synchronous_echo(output_consumer)
else:
raise NotImplementedError()
def release(self):
with self.condition:
self._released = True
self.condition.notify_all()
class EasyServicedIngestor(interfaces.ServicedIngestor):
"""A trivial implementation of interfaces.ServicedIngestor."""
def __init__(self, consumer):
self._consumer = consumer
def consumer(self, operation_context):
"""See interfaces.ServicedIngestor.consumer for specification."""
return self._consumer
class FrontAndBackTest(object):
"""A test suite usable against any joined Front and Back."""
# Pylint doesn't know that this is a unittest.TestCase mix-in.
# pylint: disable=invalid-name
def testSimplestCall(self):
"""Tests the absolute simplest call - a one-packet fire-and-forget."""
self.front.operate(
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID')
util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
# (1) The packet is still either in the front waiting to be transmitted
# or is somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
# return with the packet bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) The packet arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.COMPLETED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
# It's true that if the packet had arrived at the back and the back had
# begun processing that wait_for_idle could hold test execution until the
# back completed the operation, but that doesn't really collapse the
# possibility space down to one solution.
def testEntireEcho(self):
"""Tests a very simple one-packet-each-way round-trip."""
test_payload = 'test payload'
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
EasyServicedIngestor(test_consumer))
self.front.operate(
ASYNCHRONOUS_ECHO, test_payload, True, SMALL_TIMEOUT, subscription,
'test trace ID')
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED])
self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self):
"""Tests sending multiple packets each way."""
test_payload_template = 'test_payload: %03d'
test_payloads = [test_payload_template % i for i in range(STREAM_LENGTH)]
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
EasyServicedIngestor(test_consumer))
operation = self.front.operate(
SYNCHRONOUS_ECHO, None, False, SMALL_TIMEOUT, subscription,
'test trace ID')
for test_payload in test_payloads:
operation.consumer.consume(test_payload)
operation.consumer.terminate()
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED])
self.assertListEqual(test_payloads, test_consumer.values())
def testCancellation(self):
"""Tests cancelling a long-lived operation."""
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
EasyServicedIngestor(test_consumer))
operation = self.front.operate(
ASYNCHRONOUS_ECHO, None, False, SMALL_TIMEOUT, subscription,
'test trace ID')
operation.cancel()
util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.CANCELLED])
util.wait_for_idle(self.back)
self.assertListEqual([], test_consumer.calls)
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
# (1) Both packets are still either in the front waiting to be transmitted
# or are somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
# return with the packets bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) Both packets arrived within SMALL_TIMEOUT of one another at the back.
# The back started processing based on the first packet and then stopped
# upon receiving the cancellation packet.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.CANCELLED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
def testExpiration(self):
"""Tests that operations time out."""
timeout = TICK * 2
allowance = TICK # How much extra time to
condition = threading.Condition()
test_payload = 'test payload'
subscription = util.termination_only_serviced_subscription()
start_time = time.time()
outcome_cell = [None]
termination_time_cell = [None]
def termination_action(outcome):
with condition:
outcome_cell[0] = outcome
termination_time_cell[0] = time.time()
condition.notify()
with condition:
operation = self.front.operate(
SYNCHRONOUS_ECHO, test_payload, False, timeout, subscription,
'test trace ID')
operation.context.add_termination_callback(termination_action)
while outcome_cell[0] is None:
condition.wait()
duration = termination_time_cell[0] - start_time
self.assertLessEqual(timeout, duration)
self.assertLess(duration, timeout + allowance)
self.assertEqual(interfaces.EXPIRED, outcome_cell[0])
util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.EXPIRED])
util.wait_for_idle(self.back)
self.assertLessEqual(1, self.back.operation_stats()[interfaces.EXPIRED])

@ -0,0 +1,64 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation cancellation."""
from _framework.base.packets import _interfaces
from _framework.base.packets import packets
class CancellationManager(_interfaces.CancellationManager):
"""An implementation of _interfaces.CancellationManager."""
def __init__(
self, lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
ingestion_manager: The _interfaces.IngestionManager for the operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
self._lock = lock
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
def cancel(self):
"""See _interfaces.CancellationManager.cancel for specification."""
with self._lock:
self._termination_manager.abort(packets.Kind.CANCELLATION)
self._transmission_manager.abort(packets.Kind.CANCELLATION)
self._ingestion_manager.abort()
self._expiration_manager.abort()

@ -0,0 +1,32 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Private constants for the package."""
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Base) internal error! :-('

@ -0,0 +1,99 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation context."""
import time
# _interfaces and packets are referenced from specification in this module.
from _framework.base import interfaces as base_interfaces
from _framework.base.packets import _interfaces # pylint: disable=unused-import
from _framework.base.packets import packets # pylint: disable=unused-import
class OperationContext(base_interfaces.OperationContext):
"""An implementation of base_interfaces.OperationContext."""
def __init__(
self, lock, operation_id, local_failure, termination_manager,
transmission_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
operation_id: An object identifying the operation.
local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or
packets.Kind.SERVICER_FAILURE describes local failure of customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
self._local_failure = local_failure
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = None
self._expiration_manager = None
self.operation_id = operation_id
def set_ingestion_and_expiration_managers(
self, ingestion_manager, expiration_manager):
"""Sets managers with which this OperationContext cooperates.
Args:
ingestion_manager: The _interfaces.IngestionManager for the operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
def is_active(self):
"""See base_interfaces.OperationContext.is_active for specification."""
with self._lock:
return self._termination_manager.is_active()
def add_termination_callback(self, callback):
"""See base_interfaces.OperationContext.add_termination_callback."""
with self._lock:
self._termination_manager.add_callback(callback)
def time_remaining(self):
"""See interfaces.OperationContext.time_remaining for specification."""
with self._lock:
deadline = self._expiration_manager.deadline()
return max(0.0, deadline - time.time())
def fail(self, exception):
"""See interfaces.OperationContext.fail for specification."""
with self._lock:
self._termination_manager.abort(self._local_failure)
self._transmission_manager.abort(self._local_failure)
self._ingestion_manager.abort()
self._expiration_manager.abort()

@ -0,0 +1,126 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for handling emitted values."""
# packets is referenced from specifications in this module.
from _framework.base.packets import _interfaces
from _framework.base.packets import packets # pylint: disable=unused-import
class _EmissionManager(_interfaces.EmissionManager):
"""An implementation of _interfaces.EmissionManager."""
def __init__(
self, lock, failure_kind, termination_manager, transmission_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
packets.Kind.SERVICER_FAILURE describes this object's methods being
called inappropriately by customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
self._failure_kind = failure_kind
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = None
self._expiration_manager = None
self._emission_complete = False
def set_ingestion_manager_and_expiration_manager(
self, ingestion_manager, expiration_manager):
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
def _abort(self):
self._termination_manager.abort(self._failure_kind)
self._transmission_manager.abort(self._failure_kind)
self._ingestion_manager.abort()
self._expiration_manager.abort()
def consume(self, value):
with self._lock:
if self._emission_complete:
self._abort()
else:
self._transmission_manager.inmit(value, False)
def terminate(self):
with self._lock:
if not self._emission_complete:
self._termination_manager.emission_complete()
self._transmission_manager.inmit(None, True)
self._emission_complete = True
def consume_and_terminate(self, value):
with self._lock:
if self._emission_complete:
self._abort()
else:
self._termination_manager.emission_complete()
self._transmission_manager.inmit(value, True)
self._emission_complete = True
def front_emission_manager(lock, termination_manager, transmission_manager):
"""Creates an _interfaces.EmissionManager appropriate for front-side use.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the operation.
Returns:
An _interfaces.EmissionManager appropriate for front-side use.
"""
return _EmissionManager(
lock, packets.Kind.SERVICED_FAILURE, termination_manager,
transmission_manager)
def back_emission_manager(lock, termination_manager, transmission_manager):
"""Creates an _interfaces.EmissionManager appropriate for back-side use.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the operation.
Returns:
An _interfaces.EmissionManager appropriate for back-side use.
"""
return _EmissionManager(
lock, packets.Kind.SERVICER_FAILURE, termination_manager,
transmission_manager)

@ -0,0 +1,408 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Implementations of Fronts and Backs."""
import collections
import threading
import uuid
# _interfaces and packets are referenced from specification in this module.
from _framework.base import interfaces as base_interfaces
from _framework.base.packets import _cancellation
from _framework.base.packets import _context
from _framework.base.packets import _emission
from _framework.base.packets import _expiration
from _framework.base.packets import _ingestion
from _framework.base.packets import _interfaces # pylint: disable=unused-import
from _framework.base.packets import _reception
from _framework.base.packets import _termination
from _framework.base.packets import _transmission
from _framework.base.packets import interfaces
from _framework.base.packets import packets # pylint: disable=unused-import
from _framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
_OPERATION_OUTCOMES = (
base_interfaces.COMPLETED,
base_interfaces.CANCELLED,
base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE,
base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICER_FAILURE,
base_interfaces.SERVICED_FAILURE,
)
class _EasyOperation(base_interfaces.Operation):
"""A trivial implementation of base_interfaces.Operation."""
def __init__(self, emission_manager, context, cancellation_manager):
"""Constructor.
Args:
emission_manager: The _interfaces.EmissionManager for the operation that
will accept values emitted by customer code.
context: The base_interfaces.OperationContext for use by the customer
during the operation.
cancellation_manager: The _interfaces.CancellationManager for the
operation.
"""
self.consumer = emission_manager
self.context = context
self._cancellation_manager = cancellation_manager
def cancel(self):
self._cancellation_manager.cancel()
class _Endlette(object):
"""Utility for stateful behavior common to Fronts and Backs."""
def __init__(self, pool):
"""Constructor.
Args:
pool: A thread pool to use when calling registered idle actions.
"""
self._lock = threading.Lock()
self._pool = pool
# Dictionary from operation IDs to ReceptionManager-or-None. A None value
# indicates an in-progress fire-and-forget operation for which the customer
# has chosen to ignore results.
self._operations = {}
self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES}
self._idle_actions = []
def terminal_action(self, operation_id):
"""Constructs the termination action for a single operation.
Args:
operation_id: An operation ID.
Returns:
A callable that takes an operation outcome for an argument to be used as
the termination action for the operation associated with the given
operation ID.
"""
def termination_action(outcome):
with self._lock:
self._stats[outcome] += 1
self._operations.pop(operation_id, None)
if not self._operations:
for action in self._idle_actions:
self._pool.submit(callable_util.with_exceptions_logged(
action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE))
self._idle_actions = []
return termination_action
def __enter__(self):
self._lock.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
def get_operation(self, operation_id):
return self._operations.get(operation_id, None)
def add_operation(self, operation_id, operation_reception_manager):
self._operations[operation_id] = operation_reception_manager
def operation_stats(self):
with self._lock:
return dict(self._stats)
def add_idle_action(self, action):
with self._lock:
if self._operations:
self._idle_actions.append(action)
else:
self._pool.submit(callable_util.with_exceptions_logged(
action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE))
class _FrontManagement(
collections.namedtuple(
'_FrontManagement',
('reception', 'emission', 'operation', 'cancellation'))):
"""Just a trivial helper class to bundle four fellow-traveling objects."""
def _front_operate(
callback, work_pool, transmission_pool, utility_pool,
termination_action, operation_id, name, payload, complete, timeout,
subscription, trace_id):
"""Constructs objects necessary for front-side operation management.
Args:
callback: A callable that accepts packets.FrontToBackPackets and delivers
them to the other side of the operation. Execution of this callable may
take any arbitrary length of time.
work_pool: A thread pool in which to execute customer code.
transmission_pool: A thread pool to use for transmitting to the other side
of the operation.
utility_pool: A thread pool for utility tasks.
termination_action: A no-arg behavior to be called upon operation
completion.
operation_id: An object identifying the operation.
name: The name of the method being called during the operation.
payload: The first customer-significant value to be transmitted to the other
side. May be None if there is no such value or if the customer chose not
to pass it at operation invocation.
complete: A boolean indicating whether or not additional payloads will be
supplied by the customer.
timeout: A length of time in seconds to allow for the operation.
subscription: A base_interfaces.ServicedSubscription describing the
customer's interest in the results of the operation.
trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None.
Returns:
A _FrontManagement object bundling together the
_interfaces.ReceptionManager, _interfaces.EmissionManager,
_context.OperationContext, and _interfaces.CancellationManager for the
operation.
"""
lock = threading.Lock()
with lock:
termination_manager = _termination.front_termination_manager(
work_pool, utility_pool, termination_action, subscription.category)
transmission_manager = _transmission.front_transmission_manager(
lock, transmission_pool, callback, operation_id, name,
subscription.category, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext(
lock, operation_id, packets.Kind.SERVICED_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.front_emission_manager(
lock, termination_manager, transmission_manager)
ingestion_manager = _ingestion.front_ingestion_manager(
lock, work_pool, subscription, termination_manager,
transmission_manager, operation_context)
expiration_manager = _expiration.front_expiration_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
timeout)
reception_manager = _reception.front_reception_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
cancellation_manager = _cancellation.CancellationManager(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
emission_manager.set_ingestion_manager_and_expiration_manager(
ingestion_manager, expiration_manager)
ingestion_manager.set_expiration_manager(expiration_manager)
transmission_manager.inmit(payload, complete)
returned_reception_manager = (
None if subscription.category == base_interfaces.NONE
else reception_manager)
return _FrontManagement(
returned_reception_manager, emission_manager, operation_context,
cancellation_manager)
class Front(interfaces.Front):
"""An implementation of interfaces.Front."""
def __init__(self, work_pool, transmission_pool, utility_pool):
"""Constructor.
Args:
work_pool: A thread pool to be used for executing customer code.
transmission_pool: A thread pool to be used for transmitting values to
the other side of the operation.
utility_pool: A thread pool to be used for utility tasks.
"""
self._endlette = _Endlette(utility_pool)
self._work_pool = work_pool
self._transmission_pool = transmission_pool
self._utility_pool = utility_pool
self._callback = None
self._operations = {}
def join_rear_link(self, rear_link):
"""See interfaces.ForeLink.join_rear_link for specification."""
with self._endlette:
self._callback = rear_link.accept_front_to_back_ticket
def operation_stats(self):
"""See base_interfaces.End.operation_stats for specification."""
return self._endlette.operation_stats()
def add_idle_action(self, action):
"""See base_interfaces.End.add_idle_action for specification."""
self._endlette.add_idle_action(action)
def operate(
self, name, payload, complete, timeout, subscription, trace_id):
"""See base_interfaces.Front.operate for specification."""
operation_id = uuid.uuid4()
with self._endlette:
management = _front_operate(
self._callback, self._work_pool, self._transmission_pool,
self._utility_pool, self._endlette.terminal_action(operation_id),
operation_id, name, payload, complete, timeout, subscription,
trace_id)
self._endlette.add_operation(operation_id, management.reception)
return _EasyOperation(
management.emission, management.operation, management.cancellation)
def accept_back_to_front_ticket(self, ticket):
"""See interfaces.End.act for specification."""
with self._endlette:
reception_manager = self._endlette.get_operation(ticket.operation_id)
if reception_manager:
reception_manager.receive_packet(ticket)
def _back_operate(
servicer, callback, work_pool, transmission_pool, utility_pool,
termination_action, ticket, default_timeout, maximum_timeout):
"""Constructs objects necessary for back-side operation management.
Also begins back-side operation by feeding the first received ticket into the
constructed _interfaces.ReceptionManager.
Args:
servicer: An interfaces.Servicer for servicing operations.
callback: A callable that accepts packets.BackToFrontPackets and delivers
them to the other side of the operation. Execution of this callable may
take any arbitrary length of time.
work_pool: A thread pool in which to execute customer code.
transmission_pool: A thread pool to use for transmitting to the other side
of the operation.
utility_pool: A thread pool for utility tasks.
termination_action: A no-arg behavior to be called upon operation
completion.
ticket: The first packets.FrontToBackPacket received for the operation.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
time alloted for a single operation.
Returns:
The _interfaces.ReceptionManager to be used for the operation.
"""
lock = threading.Lock()
with lock:
termination_manager = _termination.back_termination_manager(
work_pool, utility_pool, termination_action, ticket.subscription)
transmission_manager = _transmission.back_transmission_manager(
lock, transmission_pool, callback, ticket.operation_id,
termination_manager, ticket.subscription)
operation_context = _context.OperationContext(
lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.back_emission_manager(
lock, termination_manager, transmission_manager)
ingestion_manager = _ingestion.back_ingestion_manager(
lock, work_pool, servicer, termination_manager,
transmission_manager, operation_context, emission_manager)
expiration_manager = _expiration.back_expiration_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
ticket.timeout, default_timeout, maximum_timeout)
reception_manager = _reception.back_reception_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
emission_manager.set_ingestion_manager_and_expiration_manager(
ingestion_manager, expiration_manager)
ingestion_manager.set_expiration_manager(expiration_manager)
reception_manager.receive_packet(ticket)
return reception_manager
class Back(interfaces.Back):
"""An implementation of interfaces.Back."""
def __init__(
self, servicer, work_pool, transmission_pool, utility_pool,
default_timeout, maximum_timeout):
"""Constructor.
Args:
servicer: An interfaces.Servicer for servicing operations.
work_pool: A thread pool in which to execute customer code.
transmission_pool: A thread pool to use for transmitting to the other side
of the operation.
utility_pool: A thread pool for utility tasks.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
time alloted for a single operation.
"""
self._endlette = _Endlette(utility_pool)
self._servicer = servicer
self._work_pool = work_pool
self._transmission_pool = transmission_pool
self._utility_pool = utility_pool
self._default_timeout = default_timeout
self._maximum_timeout = maximum_timeout
self._callback = None
def join_fore_link(self, fore_link):
"""See interfaces.RearLink.join_fore_link for specification."""
with self._endlette:
self._callback = fore_link.accept_back_to_front_ticket
def accept_front_to_back_ticket(self, ticket):
"""See interfaces.RearLink.accept_front_to_back_ticket for specification."""
with self._endlette:
reception_manager = self._endlette.get_operation(ticket.operation_id)
if reception_manager is None:
reception_manager = _back_operate(
self._servicer, self._callback, self._work_pool,
self._transmission_pool, self._utility_pool,
self._endlette.terminal_action(ticket.operation_id), ticket,
self._default_timeout, self._maximum_timeout)
self._endlette.add_operation(ticket.operation_id, reception_manager)
else:
reception_manager.receive_packet(ticket)
def operation_stats(self):
"""See base_interfaces.End.operation_stats for specification."""
return self._endlette.operation_stats()
def add_idle_action(self, action):
"""See base_interfaces.End.add_idle_action for specification."""
self._endlette.add_idle_action(action)

@ -0,0 +1,158 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation expiration."""
import time
from _framework.base.packets import _interfaces
from _framework.base.packets import packets
from _framework.foundation import later
class _ExpirationManager(_interfaces.ExpirationManager):
"""An implementation of _interfaces.ExpirationManager."""
def __init__(
self, lock, termination_manager, transmission_manager, ingestion_manager,
commencement, timeout, maximum_timeout):
"""Constructor.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
ingestion_manager: The _interfaces.IngestionManager for the operation.
commencement: The time in seconds since the epoch at which the operation
began.
timeout: A length of time in seconds to allow for the operation to run.
maximum_timeout: The maximum length of time in seconds to allow for the
operation to run despite what is requested via this object's
change_timout method.
"""
self._lock = lock
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = ingestion_manager
self._commencement = commencement
self._maximum_timeout = maximum_timeout
self._timeout = timeout
self._deadline = commencement + timeout
self._index = None
self._future = None
def _expire(self, index):
with self._lock:
if self._future is not None and index == self._index:
self._future = None
self._termination_manager.abort(packets.Kind.EXPIRATION)
self._transmission_manager.abort(packets.Kind.EXPIRATION)
self._ingestion_manager.abort()
def start(self):
self._index = 0
self._future = later.later(self._timeout, lambda: self._expire(0))
def change_timeout(self, timeout):
if self._future is not None and timeout != self._timeout:
self._future.cancel()
new_timeout = min(timeout, self._maximum_timeout)
new_index = self._index + 1
self._timeout = new_timeout
self._deadline = self._commencement + new_timeout
self._index = new_index
delay = self._deadline - time.time()
self._future = later.later(
delay, lambda: self._expire(new_index))
def deadline(self):
return self._deadline
def abort(self):
if self._future:
self._future.cancel()
self._future = None
self._deadline_index = None
def front_expiration_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
timeout):
"""Creates an _interfaces.ExpirationManager appropriate for front-side use.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
ingestion_manager: The _interfaces.IngestionManager for the operation.
timeout: A length of time in seconds to allow for the operation to run.
Returns:
An _interfaces.ExpirationManager appropriate for front-side use.
"""
commencement = time.time()
expiration_manager = _ExpirationManager(
lock, termination_manager, transmission_manager, ingestion_manager,
commencement, timeout, timeout)
expiration_manager.start()
return expiration_manager
def back_expiration_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
timeout, default_timeout, maximum_timeout):
"""Creates an _interfaces.ExpirationManager appropriate for back-side use.
Args:
lock: The operation-wide lock.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
ingestion_manager: The _interfaces.IngestionManager for the operation.
timeout: A length of time in seconds to allow for the operation to run. May
be None in which case default_timeout will be used.
default_timeout: The default length of time in seconds to allow for the
operation to run if the front-side customer has not specified such a value
(or if the value they specified is not yet known).
maximum_timeout: The maximum length of time in seconds to allow for the
operation to run.
Returns:
An _interfaces.ExpirationManager appropriate for back-side use.
"""
commencement = time.time()
expiration_manager = _ExpirationManager(
lock, termination_manager, transmission_manager, ingestion_manager,
commencement, default_timeout if timeout is None else timeout,
maximum_timeout)
expiration_manager.start()
return expiration_manager

@ -0,0 +1,440 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for ingestion during an operation."""
import abc
import collections
from _framework.base import exceptions
from _framework.base import interfaces
from _framework.base.packets import _constants
from _framework.base.packets import _interfaces
from _framework.base.packets import packets
from _framework.foundation import abandonment
from _framework.foundation import callable_util
from _framework.foundation import stream
_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
_CONSUME_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
class _ConsumerCreation(collections.namedtuple(
'_ConsumerCreation', ('consumer', 'remote_error', 'abandoned'))):
"""A sum type for the outcome of ingestion initialization.
Either consumer will be non-None, remote_error will be True, or abandoned will
be True.
Attributes:
consumer: A stream.Consumer for ingesting payloads.
remote_error: A boolean indicating that the consumer could not be created
due to an error on the remote side of the operation.
abandoned: A boolean indicating that the consumer creation was abandoned.
"""
class _EmptyConsumer(stream.Consumer):
"""A no-operative stream.Consumer that ignores all inputs and calls."""
def consume(self, value):
"""See stream.Consumer.consume for specification."""
def terminate(self):
"""See stream.Consumer.terminate for specification."""
def consume_and_terminate(self, value):
"""See stream.Consumer.consume_and_terminate for specification."""
class _ConsumerCreator(object):
"""Common specification of different consumer-creating behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def create_consumer(self, requirement):
"""Creates the stream.Consumer to which customer payloads will be delivered.
Any exceptions raised by this method should be attributed to and treated as
defects in the serviced or servicer code called by this method.
Args:
requirement: A value required by this _ConsumerCreator for consumer
creation.
Returns:
A _ConsumerCreation describing the result of consumer creation.
"""
raise NotImplementedError()
class _FrontConsumerCreator(_ConsumerCreator):
"""A _ConsumerCreator appropriate for front-side use."""
def __init__(self, subscription, operation_context):
"""Constructor.
Args:
subscription: The serviced's interfaces.ServicedSubscription for the
operation.
operation_context: The interfaces.OperationContext object for the
operation.
"""
self._subscription = subscription
self._operation_context = operation_context
def create_consumer(self, requirement):
"""See _ConsumerCreator.create_consumer for specification."""
if self._subscription.category == interfaces.FULL:
try:
return _ConsumerCreation(
self._subscription.ingestor.consumer(self._operation_context),
False, False)
except abandonment.Abandoned:
return _ConsumerCreation(None, False, True)
else:
return _ConsumerCreation(_EmptyConsumer(), False, False)
class _BackConsumerCreator(_ConsumerCreator):
"""A _ConsumerCreator appropriate for back-side use."""
def __init__(self, servicer, operation_context, emission_consumer):
"""Constructor.
Args:
servicer: The interfaces.Servicer that will service the operation.
operation_context: The interfaces.OperationContext object for the
operation.
emission_consumer: The stream.Consumer object to which payloads emitted
from the operation will be passed.
"""
self._servicer = servicer
self._operation_context = operation_context
self._emission_consumer = emission_consumer
def create_consumer(self, requirement):
"""See _ConsumerCreator.create_consumer for full specification.
Args:
requirement: The name of the Servicer method to be called during this
operation.
Returns:
A _ConsumerCreation describing the result of consumer creation.
"""
try:
return _ConsumerCreation(
self._servicer.service(
requirement, self._operation_context, self._emission_consumer),
False, False)
except exceptions.NoSuchMethodError:
return _ConsumerCreation(None, True, False)
except abandonment.Abandoned:
return _ConsumerCreation(None, False, True)
class _WrappedConsumer(object):
"""Wraps a consumer to catch the exceptions that it is allowed to throw."""
def __init__(self, consumer):
"""Constructor.
Args:
consumer: A stream.Consumer that may raise abandonment.Abandoned from any
of its methods.
"""
self._consumer = consumer
def moar(self, payload, complete):
"""Makes progress with the wrapped consumer.
This method catches all exceptions allowed to be thrown by the wrapped
consumer. Any exceptions raised by this method should be blamed on the
customer-supplied consumer.
Args:
payload: A customer-significant payload object. May be None only if
complete is True.
complete: Whether or not the end of the payload sequence has been reached.
May be False only if payload is not None.
Returns:
True if the wrapped consumer made progress or False if the wrapped
consumer raised abandonment.Abandoned to indicate its abandonment of
progress.
"""
try:
if payload:
if complete:
self._consumer.consume_and_terminate(payload)
else:
self._consumer.consume(payload)
else:
self._consumer.terminate()
return True
except abandonment.Abandoned:
return False
class _IngestionManager(_interfaces.IngestionManager):
"""An implementation of _interfaces.IngestionManager."""
def __init__(
self, lock, pool, consumer_creator, failure_kind, termination_manager,
transmission_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
consumer_creator: A _ConsumerCreator wrapping the portion of customer code
that when called returns the stream.Consumer with which the customer
code will ingest payload values.
failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
packets.Kind.SERVICER_FAILURE describes local failure of customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
self._pool = pool
self._consumer_creator = consumer_creator
self._failure_kind = failure_kind
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = None
self._wrapped_ingestion_consumer = None
self._pending_ingestion = []
self._ingestion_complete = False
self._processing = False
def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager
def _abort_internal_only(self):
self._wrapped_ingestion_consumer = None
self._pending_ingestion = None
def _abort_and_notify(self, outcome):
self._abort_internal_only()
self._termination_manager.abort(outcome)
self._transmission_manager.abort(outcome)
self._expiration_manager.abort()
def _next(self):
"""Computes the next step for ingestion.
Returns:
A payload, complete, continue triplet indicating what payload (if any) is
available to feed into customer code, whether or not the sequence of
payloads has terminated, and whether or not there is anything
immediately actionable to call customer code to do.
"""
if self._pending_ingestion is None:
return None, False, False
elif self._pending_ingestion:
payload = self._pending_ingestion.pop(0)
complete = self._ingestion_complete and not self._pending_ingestion
return payload, complete, True
elif self._ingestion_complete:
return None, True, True
else:
return None, False, False
def _process(self, wrapped_ingestion_consumer, payload, complete):
"""A method to call to execute customer code.
This object's lock must *not* be held when calling this method.
Args:
wrapped_ingestion_consumer: The _WrappedConsumer with which to pass
payloads to customer code.
payload: A customer payload. May be None only if complete is True.
complete: Whether or not the sequence of payloads to pass to the customer
has concluded.
"""
while True:
consumption_outcome = callable_util.call_logging_exceptions(
wrapped_ingestion_consumer.moar, _CONSUME_EXCEPTION_LOG_MESSAGE,
payload, complete)
if consumption_outcome.exception is None:
if consumption_outcome.return_value:
with self._lock:
if complete:
self._pending_ingestion = None
self._termination_manager.ingestion_complete()
return
else:
payload, complete, moar = self._next()
if not moar:
self._processing = False
return
else:
with self._lock:
if self._pending_ingestion is not None:
self._abort_and_notify(self._failure_kind)
self._processing = False
return
else:
with self._lock:
self._abort_and_notify(self._failure_kind)
self._processing = False
return
def start(self, requirement):
if self._pending_ingestion is not None:
def initialize():
consumer_creation_outcome = callable_util.call_logging_exceptions(
self._consumer_creator.create_consumer,
_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement)
if consumer_creation_outcome.return_value is None:
with self._lock:
self._abort_and_notify(self._failure_kind)
self._processing = False
elif consumer_creation_outcome.return_value.remote_error:
with self._lock:
self._abort_and_notify(packets.Kind.RECEPTION_FAILURE)
self._processing = False
elif consumer_creation_outcome.return_value.abandoned:
with self._lock:
if self._pending_ingestion is not None:
self._abort_and_notify(self._failure_kind)
self._processing = False
else:
wrapped_ingestion_consumer = _WrappedConsumer(
consumer_creation_outcome.return_value.consumer)
with self._lock:
self._wrapped_ingestion_consumer = wrapped_ingestion_consumer
payload, complete, moar = self._next()
if not moar:
self._processing = False
return
self._process(wrapped_ingestion_consumer, payload, complete)
self._pool.submit(
callable_util.with_exceptions_logged(
initialize, _constants.INTERNAL_ERROR_LOG_MESSAGE))
self._processing = True
def consume(self, payload):
if self._ingestion_complete:
self._abort_and_notify(self._failure_kind)
elif self._pending_ingestion is not None:
if self._processing:
self._pending_ingestion.append(payload)
else:
self._pool.submit(
callable_util.with_exceptions_logged(
self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
self._wrapped_ingestion_consumer, payload, False)
self._processing = True
def terminate(self):
if self._ingestion_complete:
self._abort_and_notify(self._failure_kind)
else:
self._ingestion_complete = True
if self._pending_ingestion is not None and not self._processing:
self._pool.submit(
callable_util.with_exceptions_logged(
self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
self._wrapped_ingestion_consumer, None, True)
self._processing = True
def consume_and_terminate(self, payload):
if self._ingestion_complete:
self._abort_and_notify(self._failure_kind)
else:
self._ingestion_complete = True
if self._pending_ingestion is not None:
if self._processing:
self._pending_ingestion.append(payload)
else:
self._pool.submit(
callable_util.with_exceptions_logged(
self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
self._wrapped_ingestion_consumer, payload, True)
self._processing = True
def abort(self):
"""See _interfaces.IngestionManager.abort for specification."""
self._abort_internal_only()
def front_ingestion_manager(
lock, pool, subscription, termination_manager, transmission_manager,
operation_context):
"""Creates an IngestionManager appropriate for front-side use.
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
subscription: A base_interfaces.ServicedSubscription indicating the
customer's interest in the results of the operation.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
operation_context: A base_interfaces.OperationContext for the operation.
Returns:
An IngestionManager appropriate for front-side use.
"""
ingestion_manager = _IngestionManager(
lock, pool, _FrontConsumerCreator(subscription, operation_context),
packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager)
ingestion_manager.start(None)
return ingestion_manager
def back_ingestion_manager(
lock, pool, servicer, termination_manager, transmission_manager,
operation_context, emission_consumer):
"""Creates an IngestionManager appropriate for back-side use.
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
servicer: A base_interfaces.Servicer for servicing the operation.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
operation_context: A base_interfaces.OperationContext for the operation.
emission_consumer: The _interfaces.EmissionConsumer for the operation.
Returns:
An IngestionManager appropriate for back-side use.
"""
ingestion_manager = _IngestionManager(
lock, pool, _BackConsumerCreator(
servicer, operation_context, emission_consumer),
packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager)
return ingestion_manager

@ -0,0 +1,269 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Package-internal interfaces."""
import abc
# base_interfaces and packets are referenced from specification in this module.
from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import
from _framework.base.packets import packets # pylint: disable=unused-import
from _framework.foundation import stream
class TerminationManager(object):
"""An object responsible for handling the termination of an operation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def is_active(self):
"""Reports whether or not the operation is active.
Returns:
True if the operation is active or False if the operation has terminated.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_callback(self, callback):
"""Registers a callback to be called on operation termination.
If the operation has already terminated, the callback will be called
immediately.
Args:
callback: A callable that will be passed one of base_interfaces.COMPLETED,
base_interfaces.CANCELLED, base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE.
"""
raise NotImplementedError()
@abc.abstractmethod
def emission_complete(self):
"""Indicates that emissions from customer code have completed."""
raise NotImplementedError()
@abc.abstractmethod
def transmission_complete(self):
"""Indicates that transmissions to the remote end are complete."""
raise NotImplementedError()
@abc.abstractmethod
def ingestion_complete(self):
"""Indicates that customer code ingestion of received values is complete."""
raise NotImplementedError()
@abc.abstractmethod
def abort(self, kind):
"""Indicates that the operation must abort for the indicated reason.
Args:
kind: A value of packets.Kind indicating operation abortion.
"""
raise NotImplementedError()
class TransmissionManager(object):
"""A manager responsible for transmitting to the other end of an operation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def inmit(self, emission, complete):
"""Accepts a value for transmission to the other end of the operation.
Args:
emission: A value of some significance to the customer to be transmitted
to the other end of the operation. May be None only if complete is True.
complete: A boolean that if True indicates that customer code has emitted
all values it intends to emit.
"""
raise NotImplementedError()
@abc.abstractmethod
def abort(self, kind):
"""Indicates that the operation has aborted for the indicated reason.
Args:
kind: A value of packets.Kind indicating operation abortion.
"""
raise NotImplementedError()
class EmissionManager(stream.Consumer):
"""A manager of values emitted by customer code."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_ingestion_manager_and_expiration_manager(
self, ingestion_manager, expiration_manager):
"""Sets two other objects with which this EmissionManager will cooperate.
Args:
ingestion_manager: The IngestionManager for the operation.
expiration_manager: The ExpirationManager for the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def consume(self, value):
"""Accepts a value emitted by customer code.
This method should only be called by customer code.
Args:
value: Any value of significance to the customer.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self):
"""Indicates that no more values will be emitted by customer code.
This method should only be called by customer code.
Implementations of this method may be idempotent and forgive customer code
calling this method more than once.
"""
raise NotImplementedError()
@abc.abstractmethod
def consume_and_terminate(self, value):
"""Accepts the last value emitted by customer code.
This method should only be called by customer code.
Args:
value: Any value of significance to the customer.
"""
raise NotImplementedError()
class IngestionManager(stream.Consumer):
"""A manager responsible for executing customer code."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate."""
@abc.abstractmethod
def start(self, requirement):
"""Commences execution of customer code.
Args:
requirement: Some value unavailable at the time of this object's
construction that is required to begin executing customer code.
"""
raise NotImplementedError()
@abc.abstractmethod
def consume(self, payload):
"""Accepts a customer-significant value to be supplied to customer code.
Args:
payload: Some customer-significant value.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self):
"""Indicates the end of values to be supplied to customer code."""
raise NotImplementedError()
@abc.abstractmethod
def consume_and_terminate(self, payload):
"""Accepts the last value to be supplied to customer code.
Args:
payload: Some customer-significant value (and the last such value).
"""
raise NotImplementedError()
@abc.abstractmethod
def abort(self):
"""Indicates to this manager that the operation has aborted."""
raise NotImplementedError()
class ExpirationManager(object):
"""A manager responsible for aborting the operation if it runs out of time."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def change_timeout(self, timeout):
"""Changes the timeout allotted for the operation.
Operation duration is always measure from the beginning of the operation;
calling this method changes the operation's allotted time to timeout total
seconds, not timeout seconds from the time of this method call.
Args:
timeout: A length of time in seconds to allow for the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def deadline(self):
"""Returns the time until which the operation is allowed to run.
Returns:
The time (seconds since the epoch) at which the operation will expire.
"""
raise NotImplementedError()
@abc.abstractmethod
def abort(self):
"""Indicates to this manager that the operation has aborted."""
raise NotImplementedError()
class ReceptionManager(object):
"""A manager responsible for receiving packets from the other end."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def receive_packet(self, packet):
"""Handle a packet from the other side of the operation.
Args:
packet: A packets.BackToFrontPacket or packets.FrontToBackPacket
appropriate to this end of the operation and this object.
"""
raise NotImplementedError()
class CancellationManager(object):
"""A manager of operation cancellation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cancel(self):
"""Cancels the operation."""
raise NotImplementedError()

@ -0,0 +1,394 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for packet reception."""
import abc
from _framework.base.packets import _interfaces
from _framework.base.packets import packets
class _Receiver(object):
"""Common specification of different packet-handling behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def abort_if_abortive(self, packet):
"""Aborts the operation if the packet is abortive.
Args:
packet: A just-arrived packet.
Returns:
A boolean indicating whether or not this Receiver aborted the operation
based on the packet.
"""
raise NotImplementedError()
@abc.abstractmethod
def receive(self, packet):
"""Handles a just-arrived packet.
Args:
packet: A just-arrived packet.
Returns:
A boolean indicating whether or not the packet was terminal (i.e. whether
or not non-abortive packets are legal after this one).
"""
raise NotImplementedError()
@abc.abstractmethod
def reception_failure(self):
"""Aborts the operation with an indication of reception failure."""
raise NotImplementedError()
def _abort(
category, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Indicates abortion with the given category to the given managers."""
termination_manager.abort(category)
transmission_manager.abort(category)
ingestion_manager.abort()
expiration_manager.abort()
def _abort_if_abortive(
packet, abortive, termination_manager, transmission_manager,
ingestion_manager, expiration_manager):
"""Determines a packet's being abortive and if so aborts the operation.
Args:
packet: A just-arrived packet.
abortive: A callable that takes a packet and returns an operation category
indicating that the operation should be aborted or None indicating that
the operation should not be aborted.
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
Returns:
True if the operation was aborted; False otherwise.
"""
abort_category = abortive(packet)
if abort_category is None:
return False
else:
_abort(
abort_category, termination_manager, transmission_manager,
ingestion_manager, expiration_manager)
return True
def _reception_failure(
termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Aborts the operation with an indication of reception failure."""
_abort(
packets.Kind.RECEPTION_FAILURE, termination_manager, transmission_manager,
ingestion_manager, expiration_manager)
class _BackReceiver(_Receiver):
"""Packet-handling specific to the back side of an operation."""
def __init__(
self, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Constructor.
Args:
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
"""
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
self._first_packet_seen = False
self._last_packet_seen = False
def _abortive(self, packet):
"""Determines whether or not (and if so, how) a packet is abortive.
Args:
packet: A just-arrived packet.
Returns:
One of packets.Kind.CANCELLATION, packets.Kind.SERVICED_FAILURE, or
packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive
and how, or None, indicating that the packet is not abortive.
"""
if packet.kind is packets.Kind.CANCELLATION:
return packets.Kind.CANCELLATION
elif packet.kind is packets.Kind.EXPIRATION:
return packets.Kind.EXPIRATION
elif packet.kind is packets.Kind.SERVICED_FAILURE:
return packets.Kind.SERVICED_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE:
return packets.Kind.SERVICED_FAILURE
elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and
self._first_packet_seen):
return packets.Kind.RECEPTION_FAILURE
elif self._last_packet_seen:
return packets.Kind.RECEPTION_FAILURE
else:
return None
def abort_if_abortive(self, packet):
"""See _Receiver.abort_if_abortive for specification."""
return _abort_if_abortive(
packet, self._abortive, self._termination_manager,
self._transmission_manager, self._ingestion_manager,
self._expiration_manager)
def receive(self, packet):
"""See _Receiver.receive for specification."""
if packet.timeout is not None:
self._expiration_manager.change_timeout(packet.timeout)
if packet.kind is packets.Kind.COMMENCEMENT:
self._first_packet_seen = True
self._ingestion_manager.start(packet.name)
if packet.payload is not None:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is packets.Kind.CONTINUATION:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is packets.Kind.COMPLETION:
self._last_packet_seen = True
if packet.payload is None:
self._ingestion_manager.terminate()
else:
self._ingestion_manager.consume_and_terminate(packet.payload)
else:
self._first_packet_seen = True
self._last_packet_seen = True
self._ingestion_manager.start(packet.name)
if packet.payload is None:
self._ingestion_manager.terminate()
else:
self._ingestion_manager.consume_and_terminate(packet.payload)
def reception_failure(self):
"""See _Receiver.reception_failure for specification."""
_reception_failure(
self._termination_manager, self._transmission_manager,
self._ingestion_manager, self._expiration_manager)
class _FrontReceiver(_Receiver):
"""Packet-handling specific to the front side of an operation."""
def __init__(
self, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Constructor.
Args:
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
"""
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
self._last_packet_seen = False
def _abortive(self, packet):
"""Determines whether or not (and if so, how) a packet is abortive.
Args:
packet: A just-arrived packet.
Returns:
One of packets.Kind.EXPIRATION, packets.Kind.SERVICER_FAILURE, or
packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive
and how, or None, indicating that the packet is not abortive.
"""
if packet.kind is packets.Kind.EXPIRATION:
return packets.Kind.EXPIRATION
elif packet.kind is packets.Kind.SERVICER_FAILURE:
return packets.Kind.SERVICER_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE:
return packets.Kind.SERVICER_FAILURE
elif self._last_packet_seen:
return packets.Kind.RECEPTION_FAILURE
else:
return None
def abort_if_abortive(self, packet):
"""See _Receiver.abort_if_abortive for specification."""
return _abort_if_abortive(
packet, self._abortive, self._termination_manager,
self._transmission_manager, self._ingestion_manager,
self._expiration_manager)
def receive(self, packet):
"""See _Receiver.receive for specification."""
if packet.kind is packets.Kind.CONTINUATION:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is packets.Kind.COMPLETION:
self._last_packet_seen = True
if packet.payload is None:
self._ingestion_manager.terminate()
else:
self._ingestion_manager.consume_and_terminate(packet.payload)
def reception_failure(self):
"""See _Receiver.reception_failure for specification."""
_reception_failure(
self._termination_manager, self._transmission_manager,
self._ingestion_manager, self._expiration_manager)
class _ReceptionManager(_interfaces.ReceptionManager):
"""A ReceptionManager based around a _Receiver passed to it."""
def __init__(self, lock, receiver):
"""Constructor.
Args:
lock: The operation-servicing-wide lock object.
receiver: A _Receiver responsible for handling received packets.
"""
self._lock = lock
self._receiver = receiver
self._lowest_unseen_sequence_number = 0
self._out_of_sequence_packets = {}
self._completed_sequence_number = None
self._aborted = False
def _sequence_failure(self, packet):
"""Determines a just-arrived packet's sequential legitimacy.
Args:
packet: A just-arrived packet.
Returns:
True if the packet is sequentially legitimate; False otherwise.
"""
if packet.sequence_number < self._lowest_unseen_sequence_number:
return True
elif packet.sequence_number in self._out_of_sequence_packets:
return True
elif (self._completed_sequence_number is not None and
self._completed_sequence_number <= packet.sequence_number):
return True
else:
return False
def _process(self, packet):
"""Process those packets ready to be processed.
Args:
packet: A just-arrived packet the sequence number of which matches this
_ReceptionManager's _lowest_unseen_sequence_number field.
"""
while True:
completed = self._receiver.receive(packet)
if completed:
self._out_of_sequence_packets.clear()
self._completed_sequence_number = packet.sequence_number
self._lowest_unseen_sequence_number = packet.sequence_number + 1
return
else:
next_packet = self._out_of_sequence_packets.pop(
packet.sequence_number + 1, None)
if next_packet is None:
self._lowest_unseen_sequence_number = packet.sequence_number + 1
return
else:
packet = next_packet
def receive_packet(self, packet):
"""See _interfaces.ReceptionManager.receive_packet for specification."""
with self._lock:
if self._aborted:
return
elif self._sequence_failure(packet):
self._receiver.reception_failure()
self._aborted = True
elif self._receiver.abort_if_abortive(packet):
self._aborted = True
elif packet.sequence_number == self._lowest_unseen_sequence_number:
self._process(packet)
else:
self._out_of_sequence_packets[packet.sequence_number] = packet
def front_reception_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Creates a _interfaces.ReceptionManager for front-side use.
Args:
lock: The operation-servicing-wide lock object.
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
Returns:
A _interfaces.ReceptionManager appropriate for front-side use.
"""
return _ReceptionManager(
lock, _FrontReceiver(
termination_manager, transmission_manager, ingestion_manager,
expiration_manager))
def back_reception_manager(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
"""Creates a _interfaces.ReceptionManager for back-side use.
Args:
lock: The operation-servicing-wide lock object.
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
Returns:
A _interfaces.ReceptionManager appropriate for back-side use.
"""
return _ReceptionManager(
lock, _BackReceiver(
termination_manager, transmission_manager, ingestion_manager,
expiration_manager))

@ -0,0 +1,201 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for operation termination."""
from _framework.base import interfaces
from _framework.base.packets import _constants
from _framework.base.packets import _interfaces
from _framework.base.packets import packets
from _framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
# TODO(nathaniel): enum module.
_EMISSION = 'emission'
_TRANSMISSION = 'transmission'
_INGESTION = 'ingestion'
_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,)
_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,)
_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,)
_KINDS_TO_OUTCOMES = {
packets.Kind.COMPLETION: interfaces.COMPLETED,
packets.Kind.CANCELLATION: interfaces.CANCELLED,
packets.Kind.EXPIRATION: interfaces.EXPIRED,
packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE,
packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
}
class _TerminationManager(_interfaces.TerminationManager):
"""An implementation of _interfaces.TerminationManager."""
def __init__(
self, work_pool, utility_pool, action, requirements, local_failure):
"""Constructor.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION
identifying what must finish for the operation to be considered
completed.
local_failure: A packets.Kind specifying what constitutes local failure of
customer work.
"""
self._work_pool = work_pool
self._utility_pool = utility_pool
self._action = action
self._local_failure = local_failure
self._has_locally_failed = False
self._outstanding_requirements = set(requirements)
self._kind = None
self._callbacks = []
def _terminate(self, kind):
"""Terminates the operation.
Args:
kind: One of packets.Kind.COMPLETION, packets.Kind.CANCELLATION,
packets.Kind.EXPIRATION, packets.Kind.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
packets.Kind.SERVICED_FAILURE.
"""
self._outstanding_requirements = None
callbacks = list(self._callbacks)
self._callbacks = None
self._kind = kind
outcome = _KINDS_TO_OUTCOMES[kind]
act = callable_util.with_exceptions_logged(
self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
if self._has_locally_failed:
self._utility_pool.submit(act, outcome)
else:
def call_callbacks_and_act(callbacks, outcome):
for callback in callbacks:
callback_outcome = callable_util.call_logging_exceptions(
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome)
if callback_outcome.exception is not None:
outcome = _KINDS_TO_OUTCOMES[self._local_failure]
break
self._utility_pool.submit(act, outcome)
self._work_pool.submit(callable_util.with_exceptions_logged(
call_callbacks_and_act,
_constants.INTERNAL_ERROR_LOG_MESSAGE),
callbacks, outcome)
def is_active(self):
"""See _interfaces.TerminationManager.is_active for specification."""
return self._outstanding_requirements is not None
def add_callback(self, callback):
"""See _interfaces.TerminationManager.add_callback for specification."""
if not self._has_locally_failed:
if self._outstanding_requirements is None:
self._work_pool.submit(
callable_util.with_exceptions_logged(
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE),
_KINDS_TO_OUTCOMES[self._kind])
else:
self._callbacks.append(callback)
def emission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_EMISSION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def transmission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_TRANSMISSION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def ingestion_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_INGESTION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def abort(self, kind):
"""See _interfaces.TerminationManager.abort for specification."""
if kind == self._local_failure:
self._has_failed_locally = True
if self._outstanding_requirements is not None:
self._terminate(kind)
def front_termination_manager(work_pool, utility_pool, action, subscription):
"""Creates a TerminationManager appropriate for front-side use.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
subscription: One of interfaces.FULL, interfaces.termination_only, or
interfaces.NONE.
Returns:
A TerminationManager appropriate for front-side use.
"""
return _TerminationManager(
work_pool, utility_pool, action,
_FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
_LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE)
def back_termination_manager(work_pool, utility_pool, action, subscription):
"""Creates a TerminationManager appropriate for back-side use.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
subscription: One of interfaces.FULL, interfaces.termination_only, or
interfaces.NONE.
Returns:
A TerminationManager appropriate for back-side use.
"""
return _TerminationManager(
work_pool, utility_pool, action,
_BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
_LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE)

@ -0,0 +1,393 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for packet transmission during an operation."""
import abc
from _framework.base import interfaces
from _framework.base.packets import _constants
from _framework.base.packets import _interfaces
from _framework.base.packets import packets
from _framework.foundation import callable_util
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = (
packets.Kind.SERVICER_FAILURE,
)
_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = (
packets.Kind.CANCELLATION,
packets.Kind.SERVICED_FAILURE,
)
class _Packetizer(object):
"""Common specification of different packet-creating behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def packetize(self, operation_id, sequence_number, payload, complete):
"""Creates a packet indicating ordinary operation progress.
Args:
operation_id: The operation ID for the current operation.
sequence_number: A sequence number for the packet.
payload: A customer payload object. May be None if sequence_number is
zero or complete is true.
complete: A boolean indicating whether or not the packet should describe
itself as (but for a later indication of operation abortion) the last
packet to be sent.
Returns:
An object of an appropriate type suitable for transmission to the other
side of the operation.
"""
raise NotImplementedError()
@abc.abstractmethod
def packetize_abortion(self, operation_id, sequence_number, kind):
"""Creates a packet indicating that the operation is aborted.
Args:
operation_id: The operation ID for the current operation.
sequence_number: A sequence number for the packet.
kind: One of the values of packets.Kind indicating operational abortion.
Returns:
An object of an appropriate type suitable for transmission to the other
side of the operation, or None if transmission is not appropriate for
the given kind.
"""
raise NotImplementedError()
class _FrontPacketizer(_Packetizer):
"""Front-side packet-creating behavior."""
def __init__(self, name, subscription, trace_id, timeout):
"""Constructor.
Args:
name: The name of the operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent
from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
"""
self._name = name
self._subscription = subscription
self._trace_id = trace_id
self._timeout = timeout
def packetize(self, operation_id, sequence_number, payload, complete):
"""See _Packetizer.packetize for specification."""
if sequence_number:
return packets.FrontToBackPacket(
operation_id, sequence_number,
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
self._name, self._subscription, self._trace_id, payload,
self._timeout)
else:
return packets.FrontToBackPacket(
operation_id, 0,
packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT,
self._name, self._subscription, self._trace_id, payload,
self._timeout)
def packetize_abortion(self, operation_id, sequence_number, kind):
"""See _Packetizer.packetize_abortion for specification."""
if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS:
return None
else:
return packets.FrontToBackPacket(
operation_id, sequence_number, kind, None, None, None, None, None)
class _BackPacketizer(_Packetizer):
"""Back-side packet-creating behavior."""
def packetize(self, operation_id, sequence_number, payload, complete):
"""See _Packetizer.packetize for specification."""
return packets.BackToFrontPacket(
operation_id, sequence_number,
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
payload)
def packetize_abortion(self, operation_id, sequence_number, kind):
"""See _Packetizer.packetize_abortion for specification."""
if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS:
return None
else:
return packets.BackToFrontPacket(
operation_id, sequence_number, kind, None)
class TransmissionManager(_interfaces.TransmissionManager):
"""A _interfaces.TransmissionManager on which other managers may be set."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_ingestion_and_expiration_managers(
self, ingestion_manager, expiration_manager):
"""Sets two of the other managers with which this manager may interact.
Args:
ingestion_manager: The _interfaces.IngestionManager associated with the
current operation.
expiration_manager: The _interfaces.ExpirationManager associated with the
current operation.
"""
raise NotImplementedError()
class _EmptyTransmissionManager(TransmissionManager):
"""A completely no-operative _interfaces.TransmissionManager."""
def set_ingestion_and_expiration_managers(
self, ingestion_manager, expiration_manager):
"""See overriden method for specification."""
def inmit(self, emission, complete):
"""See _interfaces.TransmissionManager.inmit for specification."""
def abort(self, category):
"""See _interfaces.TransmissionManager.abort for specification."""
class _TransmittingTransmissionManager(TransmissionManager):
"""A TransmissionManager implementation that sends packets."""
def __init__(
self, lock, pool, callback, operation_id, packetizer,
termination_manager):
"""Constructor.
Args:
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting packets will be
performed.
callback: A callable that accepts packets and sends them to the other side
of the operation.
operation_id: The operation's ID.
packetizer: A _Packetizer for packet creation.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
"""
self._lock = lock
self._pool = pool
self._callback = callback
self._operation_id = operation_id
self._packetizer = packetizer
self._termination_manager = termination_manager
self._ingestion_manager = None
self._expiration_manager = None
self._emissions = []
self._emission_complete = False
self._kind = None
self._lowest_unused_sequence_number = 0
self._transmitting = False
def set_ingestion_and_expiration_managers(
self, ingestion_manager, expiration_manager):
"""See overridden method for specification."""
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
def _lead_packet(self, emission, complete):
"""Creates a packet suitable for leading off the transmission loop.
Args:
emission: A customer payload object to be sent to the other side of the
operation.
complete: Whether or not the sequence of customer payloads ends with
the passed object.
Returns:
A packet with which to lead off the transmission loop.
"""
sequence_number = self._lowest_unused_sequence_number
self._lowest_unused_sequence_number += 1
return self._packetizer.packetize(
self._operation_id, sequence_number, emission, complete)
def _abortive_response_packet(self, kind):
"""Creates a packet indicating operation abortion.
Args:
kind: One of the values of packets.Kind indicating operational abortion.
Returns:
A packet indicating operation abortion.
"""
packet = self._packetizer.packetize_abortion(
self._operation_id, self._lowest_unused_sequence_number, kind)
if packet is None:
return None
else:
self._lowest_unused_sequence_number += 1
return packet
def _next_packet(self):
"""Creates the next packet to be sent to the other side of the operation.
Returns:
A (completed, packet) tuple comprised of a boolean indicating whether or
not the sequence of packets has completed normally and a packet to send
to the other side if the sequence of packets hasn't completed. The tuple
will never have both a True first element and a non-None second element.
"""
if self._emissions is None:
return False, None
elif self._kind is None:
if self._emissions:
payload = self._emissions.pop(0)
complete = self._emission_complete and not self._emissions
sequence_number = self._lowest_unused_sequence_number
self._lowest_unused_sequence_number += 1
return complete, self._packetizer.packetize(
self._operation_id, sequence_number, payload, complete)
else:
return self._emission_complete, None
else:
packet = self._abortive_response_packet(self._kind)
self._emissions = None
return False, None if packet is None else packet
def _transmit(self, packet):
"""Commences the transmission loop sending packets.
Args:
packet: A packet to be sent to the other side of the operation.
"""
def transmit(packet):
while True:
transmission_outcome = callable_util.call_logging_exceptions(
self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet)
if transmission_outcome.exception is None:
with self._lock:
complete, packet = self._next_packet()
if packet is None:
if complete:
self._termination_manager.transmission_complete()
self._transmitting = False
return
else:
with self._lock:
self._emissions = None
self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE)
self._ingestion_manager.abort()
self._expiration_manager.abort()
self._transmitting = False
return
self._pool.submit(callable_util.with_exceptions_logged(
transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet)
self._transmitting = True
def inmit(self, emission, complete):
"""See _interfaces.TransmissionManager.inmit for specification."""
if self._emissions is not None and self._kind is None:
self._emission_complete = complete
if self._transmitting:
self._emissions.append(emission)
else:
self._transmit(self._lead_packet(emission, complete))
def abort(self, kind):
"""See _interfaces.TransmissionManager.abort for specification."""
if self._emissions is not None and self._kind is None:
self._kind = kind
if not self._transmitting:
packet = self._abortive_response_packet(kind)
self._emissions = None
if packet is not None:
self._transmit(packet)
def front_transmission_manager(
lock, pool, callback, operation_id, name, subscription, trace_id, timeout,
termination_manager):
"""Creates a TransmissionManager appropriate for front-side use.
Args:
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting packets will be
performed.
callback: A callable that accepts packets and sends them to the other side
of the operation.
operation_id: The operation's ID.
name: The name of the operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent
from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
Returns:
A TransmissionManager appropriate for front-side use.
"""
return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _FrontPacketizer(
name, subscription, trace_id, timeout),
termination_manager)
def back_transmission_manager(
lock, pool, callback, operation_id, termination_manager, subscription):
"""Creates a TransmissionManager appropriate for back-side use.
Args:
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting packets will be
performed.
callback: A callable that accepts packets and sends them to the other side
of the operation.
operation_id: The operation's ID.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent from
the back.
Returns:
A TransmissionManager appropriate for back-side use.
"""
if subscription == interfaces.NONE:
return _EmptyTransmissionManager()
else:
return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _BackPacketizer(),
termination_manager)

@ -0,0 +1,77 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the packet-exchange-based implementation the base layer."""
# interfaces is referenced from specification in this module.
from _framework.base.packets import _ends
from _framework.base.packets import interfaces # pylint: disable=unused-import
def front(work_pool, transmission_pool, utility_pool):
"""Factory function for creating interfaces.Fronts.
Args:
work_pool: A thread pool to be used for doing work within the created Front
object.
transmission_pool: A thread pool to be used within the created Front object
for transmitting values to some Back object.
utility_pool: A thread pool to be used within the created Front object for
utility tasks.
Returns:
An interfaces.Front.
"""
return _ends.Front(work_pool, transmission_pool, utility_pool)
def back(
servicer, work_pool, transmission_pool, utility_pool, default_timeout,
maximum_timeout):
"""Factory function for creating interfaces.Backs.
Args:
servicer: An interfaces.Servicer for servicing operations.
work_pool: A thread pool to be used for doing work within the created Back
object.
transmission_pool: A thread pool to be used within the created Back object
for transmitting values to some Front object.
utility_pool: A thread pool to be used within the created Back object for
utility tasks.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
time alloted for a single operation.
Returns:
An interfaces.Back.
"""
return _ends.Back(
servicer, work_pool, transmission_pool, utility_pool, default_timeout,
maximum_timeout)

@ -0,0 +1,80 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests for _framework.base.packets.implementations."""
import unittest
from _framework.base import interfaces_test
from _framework.base import util
from _framework.base.packets import implementations
from _framework.foundation import logging_pool
POOL_MAX_WORKERS = 100
DEFAULT_TIMEOUT = 30
MAXIMUM_TIMEOUT = 60
class ImplementationsTest(
interfaces_test.FrontAndBackTest, unittest.TestCase):
def setUp(self):
self.memory_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.front_work_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.front_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.front_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.back_work_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.back_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.back_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.test_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.test_servicer = interfaces_test.TestServicer(self.test_pool)
self.front = implementations.front(
self.front_work_pool, self.front_transmission_pool,
self.front_utility_pool)
self.back = implementations.back(
self.test_servicer, self.back_work_pool, self.back_transmission_pool,
self.back_utility_pool, DEFAULT_TIMEOUT, MAXIMUM_TIMEOUT)
self.front.join_rear_link(self.back)
self.back.join_fore_link(self.front)
def tearDown(self):
util.wait_for_idle(self.back)
util.wait_for_idle(self.front)
self.memory_transmission_pool.shutdown(wait=True)
self.front_work_pool.shutdown(wait=True)
self.front_transmission_pool.shutdown(wait=True)
self.front_utility_pool.shutdown(wait=True)
self.back_work_pool.shutdown(wait=True)
self.back_transmission_pool.shutdown(wait=True)
self.back_utility_pool.shutdown(wait=True)
self.test_pool.shutdown(wait=True)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,108 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the packet-exchange-based implementation the base layer."""
import threading
from _framework.base.packets import _constants
from _framework.base.packets import interfaces
from _framework.foundation import callable_util
class _Serializer(object):
"""A utility for serializing values that may arrive concurrently."""
def __init__(self, pool):
self._lock = threading.Lock()
self._pool = pool
self._sink = None
self._spinning = False
self._values = []
def _spin(self, sink, value):
while True:
sink(value)
with self._lock:
if self._sink is None or not self._values:
self._spinning = False
return
else:
sink, value = self._sink, self._values.pop(0)
def set_sink(self, sink):
with self._lock:
self._sink = sink
if sink is not None and self._values and not self._spinning:
self._spinning = True
self._pool.submit(
callable_util.with_exceptions_logged(
self._spin, _constants.INTERNAL_ERROR_LOG_MESSAGE),
sink, self._values.pop(0))
def add_value(self, value):
with self._lock:
if self._sink and not self._spinning:
self._spinning = True
self._pool.submit(
callable_util.with_exceptions_logged(
self._spin, _constants.INTERNAL_ERROR_LOG_MESSAGE),
self._sink, value)
else:
self._values.append(value)
class Link(interfaces.ForeLink, interfaces.RearLink):
"""A trivial implementation of interfaces.ForeLink and interfaces.RearLink."""
def __init__(self, pool):
"""Constructor.
Args:
pool: A thread pool to be used for serializing ticket exchange in each
direction.
"""
self._front_to_back = _Serializer(pool)
self._back_to_front = _Serializer(pool)
def join_fore_link(self, fore_link):
"""See interfaces.RearLink.join_fore_link for specification."""
self._back_to_front.set_sink(fore_link.accept_back_to_front_ticket)
def join_rear_link(self, rear_link):
"""See interfaces.ForeLink.join_rear_link for specification."""
self._front_to_back.set_sink(rear_link.accept_front_to_back_ticket)
def accept_front_to_back_ticket(self, ticket):
"""See interfaces.ForeLink.accept_front_to_back_ticket for specification."""
self._front_to_back.add_value(ticket)
def accept_back_to_front_ticket(self, ticket):
"""See interfaces.RearLink.accept_back_to_front_ticket for specification."""
self._back_to_front.add_value(ticket)

@ -0,0 +1,84 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces defined and used by the base layer of RPC Framework."""
import abc
# packets is referenced from specifications in this module.
from _framework.base import interfaces
from _framework.base.packets import packets # pylint: disable=unused-import
class ForeLink(object):
"""Accepts back-to-front tickets and emits front-to-back tickets."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def accept_back_to_front_ticket(self, ticket):
"""Accept a packets.BackToFrontPacket.
Args:
ticket: Any packets.BackToFrontPacket.
"""
raise NotImplementedError()
@abc.abstractmethod
def join_rear_link(self, rear_link):
"""Mates this object with a peer with which it will exchange tickets."""
raise NotImplementedError()
class RearLink(object):
"""Accepts front-to-back tickets and emits back-to-front tickets."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def accept_front_to_back_ticket(self, ticket):
"""Accepts a packets.FrontToBackPacket.
Args:
ticket: Any packets.FrontToBackPacket.
"""
raise NotImplementedError()
@abc.abstractmethod
def join_fore_link(self, fore_link):
"""Mates this object with a peer with which it will exchange tickets."""
raise NotImplementedError()
class Front(ForeLink, interfaces.Front):
"""Clientish objects that operate by sending and receiving tickets."""
__metaclass__ = abc.ABCMeta
class Back(RearLink, interfaces.Back):
"""Serverish objects that operate by sending and receiving tickets."""
__metaclass__ = abc.ABCMeta

@ -0,0 +1,56 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Null links that ignore tickets passed to them."""
from _framework.base.packets import interfaces
class _NullForeLink(interfaces.ForeLink):
"""A do-nothing ForeLink."""
def accept_back_to_front_ticket(self, ticket):
pass
def join_rear_link(self, rear_link):
raise NotImplementedError()
class _NullRearLink(interfaces.RearLink):
"""A do-nothing RearLink."""
def accept_front_to_back_ticket(self, ticket):
pass
def join_fore_link(self, fore_link):
raise NotImplementedError()
NULL_FORE_LINK = _NullForeLink()
NULL_REAR_LINK = _NullRearLink()

@ -0,0 +1,112 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Packets used between fronts and backs."""
import collections
import enum
# interfaces is referenced from specifications in this module.
from _framework.base import interfaces # pylint: disable=unused-import
@enum.unique
class Kind(enum.Enum):
"""Identifies the overall kind of a ticket."""
COMMENCEMENT = 'commencement'
CONTINUATION = 'continuation'
COMPLETION = 'completion'
ENTIRE = 'entire'
CANCELLATION = 'cancellation'
EXPIRATION = 'expiration'
SERVICER_FAILURE = 'servicer failure'
SERVICED_FAILURE = 'serviced failure'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
class FrontToBackPacket(
collections.namedtuple(
'FrontToBackPacket',
['operation_id', 'sequence_number', 'kind', 'name', 'subscription',
'trace_id', 'payload', 'timeout'])):
"""A sum type for all values sent from a front to a back.
Attributes:
operation_id: A unique-with-respect-to-equality hashable object identifying
a particular operation.
sequence_number: A zero-indexed integer sequence number identifying the
packet's place among all the packets sent from front to back for this
particular operation. Must be zero if kind is Kind.COMMENCEMENT or
Kind.ENTIRE. Must be positive for any other kind.
kind: One of Kind.COMMENCEMENT, Kind.CONTINUATION, Kind.COMPLETION,
Kind.ENTIRE, Kind.CANCELLATION, Kind.EXPIRATION, Kind.SERVICED_FAILURE,
Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE.
name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
or Kind.ENTIRE. Must be None for any other kind.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent from
the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE.
Must be None for any other kind.
trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None.
payload: A customer payload object. Must be present if kind is
Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None
for any other kind.
timeout: An optional length of time (measured from the beginning of the
operation) to allow for the entire operation. If None, a default value on
the back will be used. If present and excessively large, the back may
limit the operation to a smaller duration of its choice. May be present
for any ticket kind; setting a value on a later ticket allows fronts
to request time extensions (or even time reductions!) on in-progress
operations.
"""
class BackToFrontPacket(
collections.namedtuple(
'BackToFrontPacket',
['operation_id', 'sequence_number', 'kind', 'payload'])):
"""A sum type for all values sent from a back to a front.
Attributes:
operation_id: A unique-with-respect-to-equality hashable object identifying
a particular operation.
sequence_number: A zero-indexed integer sequence number identifying the
packet's place among all the packets sent from back to front for this
particular operation.
kind: One of Kind.CONTINUATION, Kind.COMPLETION, Kind.EXPIRATION,
Kind.SERVICER_FAILURE, Kind.RECEPTION_FAILURE, or
Kind.TRANSMISSION_FAILURE.
payload: A customer payload object. Must be present if kind is
Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None if
kind is Kind.EXPIRATION, Kind.SERVICER_FAILURE, Kind.RECEPTION_FAILURE, or
Kind.TRANSMISSION_FAILURE.
"""

@ -0,0 +1,91 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities helpful for working with the base layer of RPC Framework."""
import collections
import threading
from _framework.base import interfaces
class _ServicedSubscription(
collections.namedtuple('_ServicedSubscription', ['category', 'ingestor']),
interfaces.ServicedSubscription):
"""See interfaces.ServicedSubscription for specification."""
_NONE_SUBSCRIPTION = _ServicedSubscription(interfaces.NONE, None)
_TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription(
interfaces.TERMINATION_ONLY, None)
def none_serviced_subscription():
"""Creates a "none" interfaces.ServicedSubscription object.
Returns:
An interfaces.ServicedSubscription indicating no subscription to an
operation's results (such as would be the case for a fire-and-forget
operation invocation).
"""
return _NONE_SUBSCRIPTION
def termination_only_serviced_subscription():
"""Creates a "termination only" interfaces.ServicedSubscription object.
Returns:
An interfaces.ServicedSubscription indicating that the front-side customer
is interested only in the overall termination outcome of the operation
(such as completion or expiration) and would ignore the actual results of
the operation.
"""
return _TERMINATION_ONLY_SUBSCRIPTION
def full_serviced_subscription(ingestor):
"""Creates a "full" interfaces.ServicedSubscription object.
Args:
ingestor: A ServicedIngestor.
Returns:
A ServicedSubscription object indicating a full subscription.
"""
return _ServicedSubscription(interfaces.FULL, ingestor)
def wait_for_idle(end):
"""Waits for an interfaces.End to complete all operations.
Args:
end: Any interfaces.End.
"""
event = threading.Event()
end.add_idle_action(event.set)
event.wait()
Loading…
Cancel
Save