Merge pull request #294 from nathanielmanistaatgoogle/enum

Merge enum-ification of Python RPC Framework.
pull/349/head
Craig Tiller 10 years ago
commit 1fb40badf1
  1. 11
      src/python/src/_adapter/_links_test.py
  2. 2
      src/python/src/_adapter/_lonely_rear_link_test.py
  3. 3
      src/python/src/_adapter/fore.py
  4. 49
      src/python/src/_framework/base/interfaces.py
  5. 44
      src/python/src/_framework/base/interfaces_test.py
  6. 25
      src/python/src/_framework/base/packets/_ends.py
  7. 2
      src/python/src/_framework/base/packets/_ingestion.py
  8. 5
      src/python/src/_framework/base/packets/_interfaces.py
  9. 82
      src/python/src/_framework/base/packets/_termination.py
  10. 34
      src/python/src/_framework/base/packets/_transmission.py
  11. 7
      src/python/src/_framework/base/packets/packets.py
  12. 15
      src/python/src/_framework/base/util.py
  13. 11
      src/python/src/_framework/face/_calls.py
  14. 28
      src/python/src/_framework/face/_control.py
  15. 43
      src/python/src/_framework/face/interfaces.py
  16. 24
      src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py

@ -80,8 +80,8 @@ class RoundTripTest(unittest.TestCase):
rear_link.start() rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket( front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL, test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
None, None, _TIMEOUT) interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket) rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition: with test_fore_link.condition:
@ -133,8 +133,9 @@ class RoundTripTest(unittest.TestCase):
rear_link.start() rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket( front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL, test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
None, test_front_to_back_datum, _TIMEOUT) interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket) rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition: with test_fore_link.condition:
@ -196,7 +197,7 @@ class RoundTripTest(unittest.TestCase):
commencement_ticket = tickets.FrontToBackPacket( commencement_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method, test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method,
interfaces.FULL, None, None, _TIMEOUT) interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
fore_sequence_number = 1 fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket) rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests(): for request in scenario.requests():

@ -69,7 +69,7 @@ class LonelyRearLinkTest(unittest.TestCase):
front_to_back_ticket = packets.FrontToBackPacket( front_to_back_ticket = packets.FrontToBackPacket(
test_operation_id, 0, front_to_back_ticket_kind, test_method, test_operation_id, 0, front_to_back_ticket_kind, test_method,
interfaces.FULL, None, None, _TIMEOUT) interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket) rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with fore_link.condition: with fore_link.condition:

@ -116,7 +116,8 @@ class ForeLink(ticket_interfaces.ForeLink):
self._response_serializers[method]) self._response_serializers[method])
ticket = tickets.FrontToBackPacket( ticket = tickets.FrontToBackPacket(
call, 0, tickets.Kind.COMMENCEMENT, method, interfaces.FULL, None, None, call, 0, tickets.Kind.COMMENCEMENT, method,
interfaces.ServicedSubscription.Kind.FULL, None, None,
service_acceptance.deadline - time.time()) service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket) self._rear_link.accept_front_to_back_ticket(ticket)

@ -29,27 +29,24 @@
"""Interfaces defined and used by the base layer of RPC Framework.""" """Interfaces defined and used by the base layer of RPC Framework."""
# TODO(nathaniel): Use Python's new enum library for enumerated types rather
# than constants merely placed close together.
import abc import abc
import enum
# stream is referenced from specification in this module. # stream is referenced from specification in this module.
from _framework.foundation import stream # pylint: disable=unused-import from _framework.foundation import stream # pylint: disable=unused-import
# Operation outcomes.
COMPLETED = 'completed'
CANCELLED = 'cancelled'
EXPIRED = 'expired'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
SERVICER_FAILURE = 'servicer failure'
SERVICED_FAILURE = 'serviced failure'
# Subscription categories. @enum.unique
FULL = 'full' class Outcome(enum.Enum):
TERMINATION_ONLY = 'termination only' """Operation outcomes."""
NONE = 'none'
COMPLETED = 'completed'
CANCELLED = 'cancelled'
EXPIRED = 'expired'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
SERVICER_FAILURE = 'servicer failure'
SERVICED_FAILURE = 'serviced failure'
class OperationContext(object): class OperationContext(object):
@ -70,9 +67,7 @@ class OperationContext(object):
"""Adds a function to be called upon operation termination. """Adds a function to be called upon operation termination.
Args: Args:
callback: A callable that will be passed one of COMPLETED, CANCELLED, callback: A callable that will be passed an Outcome value.
EXPIRED, RECEPTION_FAILURE, TRANSMISSION_FAILURE, SERVICER_FAILURE, or
SERVICED_FAILURE.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -167,11 +162,20 @@ class ServicedSubscription(object):
"""A sum type representing a serviced's interest in an operation. """A sum type representing a serviced's interest in an operation.
Attributes: Attributes:
category: One of FULL, TERMINATION_ONLY, or NONE. kind: A Kind value.
ingestor: A ServicedIngestor. Must be present if category is FULL. ingestor: A ServicedIngestor. Must be present if kind is Kind.FULL. Must
be None if kind is Kind.TERMINATION_ONLY or Kind.NONE.
""" """
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@enum.unique
class Kind(enum.Enum):
"""Kinds of subscription."""
FULL = 'full'
TERMINATION_ONLY = 'termination only'
NONE = 'none'
class End(object): class End(object):
"""Common type for entry-point objects on both sides of an operation.""" """Common type for entry-point objects on both sides of an operation."""
@ -182,9 +186,8 @@ class End(object):
"""Reports the number of terminated operations broken down by outcome. """Reports the number of terminated operations broken down by outcome.
Returns: Returns:
A dictionary from operation outcome constant (COMPLETED, CANCELLED, A dictionary from Outcome value to an integer identifying the number
EXPIRED, and so on) to an integer representing the number of operations of operations that terminated with that outcome.
that terminated with that outcome.
""" """
raise NotImplementedError() raise NotImplementedError()

@ -49,13 +49,13 @@ TRIGGERED_FAILURE = 'triggered failure'
WAIT_ON_CONDITION = 'wait on condition' WAIT_ON_CONDITION = 'wait on condition'
EMPTY_OUTCOME_DICT = { EMPTY_OUTCOME_DICT = {
interfaces.COMPLETED: 0, interfaces.Outcome.COMPLETED: 0,
interfaces.CANCELLED: 0, interfaces.Outcome.CANCELLED: 0,
interfaces.EXPIRED: 0, interfaces.Outcome.EXPIRED: 0,
interfaces.RECEPTION_FAILURE: 0, interfaces.Outcome.RECEPTION_FAILURE: 0,
interfaces.TRANSMISSION_FAILURE: 0, interfaces.Outcome.TRANSMISSION_FAILURE: 0,
interfaces.SERVICER_FAILURE: 0, interfaces.Outcome.SERVICER_FAILURE: 0,
interfaces.SERVICED_FAILURE: 0, interfaces.Outcome.SERVICED_FAILURE: 0,
} }
@ -169,7 +169,8 @@ class FrontAndBackTest(object):
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT, SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID') util.none_serviced_subscription(), 'test trace ID')
util.wait_for_idle(self.front) util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
# Assuming nothing really pathological (such as pauses on the order of # Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways # SMALL_TIMEOUT interfering with this test) there are a two different ways
@ -183,7 +184,7 @@ class FrontAndBackTest(object):
first_back_possibility = EMPTY_OUTCOME_DICT first_back_possibility = EMPTY_OUTCOME_DICT
# (2) The packet arrived at the back and the back completed the operation. # (2) The packet arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT) second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.COMPLETED] = 1 second_back_possibility[interfaces.Outcome.COMPLETED] = 1
self.assertIn( self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility)) back_operation_stats, (first_back_possibility, second_back_possibility))
# It's true that if the packet had arrived at the back and the back had # It's true that if the packet had arrived at the back and the back had
@ -204,8 +205,10 @@ class FrontAndBackTest(object):
util.wait_for_idle(self.front) util.wait_for_idle(self.front)
util.wait_for_idle(self.back) util.wait_for_idle(self.back)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) self.assertEqual(
self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED]) 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertEqual(
1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual([(test_payload, True)], test_consumer.calls) self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self): def testBidirectionalStreamingEcho(self):
@ -226,8 +229,10 @@ class FrontAndBackTest(object):
util.wait_for_idle(self.front) util.wait_for_idle(self.front)
util.wait_for_idle(self.back) util.wait_for_idle(self.back)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) self.assertEqual(
self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED]) 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertEqual(
1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual(test_payloads, test_consumer.values()) self.assertListEqual(test_payloads, test_consumer.values())
def testCancellation(self): def testCancellation(self):
@ -242,7 +247,8 @@ class FrontAndBackTest(object):
operation.cancel() operation.cancel()
util.wait_for_idle(self.front) util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.CANCELLED]) self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.CANCELLED])
util.wait_for_idle(self.back) util.wait_for_idle(self.back)
self.assertListEqual([], test_consumer.calls) self.assertListEqual([], test_consumer.calls)
@ -260,7 +266,7 @@ class FrontAndBackTest(object):
# The back started processing based on the first packet and then stopped # The back started processing based on the first packet and then stopped
# upon receiving the cancellation packet. # upon receiving the cancellation packet.
second_back_possibility = dict(EMPTY_OUTCOME_DICT) second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.CANCELLED] = 1 second_back_possibility[interfaces.Outcome.CANCELLED] = 1
self.assertIn( self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility)) back_operation_stats, (first_back_possibility, second_back_possibility))
@ -292,8 +298,10 @@ class FrontAndBackTest(object):
duration = termination_time_cell[0] - start_time duration = termination_time_cell[0] - start_time
self.assertLessEqual(timeout, duration) self.assertLessEqual(timeout, duration)
self.assertLess(duration, timeout + allowance) self.assertLess(duration, timeout + allowance)
self.assertEqual(interfaces.EXPIRED, outcome_cell[0]) self.assertEqual(interfaces.Outcome.EXPIRED, outcome_cell[0])
util.wait_for_idle(self.front) util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.EXPIRED]) self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.EXPIRED])
util.wait_for_idle(self.back) util.wait_for_idle(self.back)
self.assertLessEqual(1, self.back.operation_stats()[interfaces.EXPIRED]) self.assertLessEqual(
1, self.back.operation_stats()[interfaces.Outcome.EXPIRED])

@ -51,13 +51,13 @@ from _framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' _IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
_OPERATION_OUTCOMES = ( _OPERATION_OUTCOMES = (
base_interfaces.COMPLETED, base_interfaces.Outcome.COMPLETED,
base_interfaces.CANCELLED, base_interfaces.Outcome.CANCELLED,
base_interfaces.EXPIRED, base_interfaces.Outcome.EXPIRED,
base_interfaces.RECEPTION_FAILURE, base_interfaces.Outcome.RECEPTION_FAILURE,
base_interfaces.TRANSMISSION_FAILURE, base_interfaces.Outcome.TRANSMISSION_FAILURE,
base_interfaces.SERVICER_FAILURE, base_interfaces.Outcome.SERVICER_FAILURE,
base_interfaces.SERVICED_FAILURE, base_interfaces.Outcome.SERVICED_FAILURE,
) )
@ -193,10 +193,10 @@ def _front_operate(
lock = threading.Lock() lock = threading.Lock()
with lock: with lock:
termination_manager = _termination.front_termination_manager( termination_manager = _termination.front_termination_manager(
work_pool, utility_pool, termination_action, subscription.category) work_pool, utility_pool, termination_action, subscription.kind)
transmission_manager = _transmission.front_transmission_manager( transmission_manager = _transmission.front_transmission_manager(
lock, transmission_pool, callback, operation_id, name, lock, transmission_pool, callback, operation_id, name,
subscription.category, trace_id, timeout, termination_manager) subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext( operation_context = _context.OperationContext(
lock, operation_id, packets.Kind.SERVICED_FAILURE, lock, operation_id, packets.Kind.SERVICED_FAILURE,
termination_manager, transmission_manager) termination_manager, transmission_manager)
@ -225,9 +225,10 @@ def _front_operate(
transmission_manager.inmit(payload, complete) transmission_manager.inmit(payload, complete)
returned_reception_manager = ( if subscription.kind is base_interfaces.ServicedSubscription.Kind.NONE:
None if subscription.category == base_interfaces.NONE returned_reception_manager = None
else reception_manager) else:
returned_reception_manager = reception_manager
return _FrontManagement( return _FrontManagement(
returned_reception_manager, emission_manager, operation_context, returned_reception_manager, emission_manager, operation_context,

@ -111,7 +111,7 @@ class _FrontConsumerCreator(_ConsumerCreator):
def create_consumer(self, requirement): def create_consumer(self, requirement):
"""See _ConsumerCreator.create_consumer for specification.""" """See _ConsumerCreator.create_consumer for specification."""
if self._subscription.category == interfaces.FULL: if self._subscription.kind is interfaces.ServicedSubscription.Kind.FULL:
try: try:
return _ConsumerCreation( return _ConsumerCreation(
self._subscription.ingestor.consumer(self._operation_context), self._subscription.ingestor.consumer(self._operation_context),

@ -58,10 +58,7 @@ class TerminationManager(object):
immediately. immediately.
Args: Args:
callback: A callable that will be passed one of base_interfaces.COMPLETED, callback: A callable that will be passed a base_interfaces.Outcome value.
base_interfaces.CANCELLED, base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE.
""" """
raise NotImplementedError() raise NotImplementedError()

@ -29,6 +29,8 @@
"""State and behavior for operation termination.""" """State and behavior for operation termination."""
import enum
from _framework.base import interfaces from _framework.base import interfaces
from _framework.base.packets import _constants from _framework.base.packets import _constants
from _framework.base.packets import _interfaces from _framework.base.packets import _interfaces
@ -37,26 +39,32 @@ from _framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!' _CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
# TODO(nathaniel): enum module.
_EMISSION = 'emission'
_TRANSMISSION = 'transmission'
_INGESTION = 'ingestion'
_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,)
_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,)
_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,)
_KINDS_TO_OUTCOMES = { _KINDS_TO_OUTCOMES = {
packets.Kind.COMPLETION: interfaces.COMPLETED, packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED,
packets.Kind.CANCELLATION: interfaces.CANCELLED, packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED,
packets.Kind.EXPIRATION: interfaces.EXPIRED, packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED,
packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE, packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE, packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE,
packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE,
packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE,
} }
@enum.unique
class _Requirement(enum.Enum):
"""Symbols indicating events required for termination."""
EMISSION = 'emission'
TRANSMISSION = 'transmission'
INGESTION = 'ingestion'
_FRONT_NOT_LISTENING_REQUIREMENTS = (_Requirement.TRANSMISSION,)
_BACK_NOT_LISTENING_REQUIREMENTS = (
_Requirement.EMISSION, _Requirement.INGESTION,)
_LISTENING_REQUIREMENTS = (
_Requirement.TRANSMISSION, _Requirement.INGESTION,)
class _TerminationManager(_interfaces.TerminationManager): class _TerminationManager(_interfaces.TerminationManager):
"""An implementation of _interfaces.TerminationManager.""" """An implementation of _interfaces.TerminationManager."""
@ -68,9 +76,8 @@ class _TerminationManager(_interfaces.TerminationManager):
work_pool: A thread pool in which customer work will be done. work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done. utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination. action: An action to call on operation termination.
requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION requirements: A combination of _Requirement values identifying what
identifying what must finish for the operation to be considered must finish for the operation to be considered completed.
completed.
local_failure: A packets.Kind specifying what constitutes local failure of local_failure: A packets.Kind specifying what constitutes local failure of
customer work. customer work.
""" """
@ -137,21 +144,21 @@ class _TerminationManager(_interfaces.TerminationManager):
def emission_complete(self): def emission_complete(self):
"""See superclass method for specification.""" """See superclass method for specification."""
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_EMISSION) self._outstanding_requirements.discard(_Requirement.EMISSION)
if not self._outstanding_requirements: if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION) self._terminate(packets.Kind.COMPLETION)
def transmission_complete(self): def transmission_complete(self):
"""See superclass method for specification.""" """See superclass method for specification."""
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_TRANSMISSION) self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
if not self._outstanding_requirements: if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION) self._terminate(packets.Kind.COMPLETION)
def ingestion_complete(self): def ingestion_complete(self):
"""See superclass method for specification.""" """See superclass method for specification."""
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_INGESTION) self._outstanding_requirements.discard(_Requirement.INGESTION)
if not self._outstanding_requirements: if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION) self._terminate(packets.Kind.COMPLETION)
@ -163,39 +170,46 @@ class _TerminationManager(_interfaces.TerminationManager):
self._terminate(kind) self._terminate(kind)
def front_termination_manager(work_pool, utility_pool, action, subscription): def front_termination_manager(
work_pool, utility_pool, action, subscription_kind):
"""Creates a TerminationManager appropriate for front-side use. """Creates a TerminationManager appropriate for front-side use.
Args: Args:
work_pool: A thread pool in which customer work will be done. work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done. utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination. action: An action to call on operation termination.
subscription: One of interfaces.FULL, interfaces.termination_only, or subscription_kind: An interfaces.ServicedSubscription.Kind value.
interfaces.NONE.
Returns: Returns:
A TerminationManager appropriate for front-side use. A TerminationManager appropriate for front-side use.
""" """
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
requirements = _FRONT_NOT_LISTENING_REQUIREMENTS
else:
requirements = _LISTENING_REQUIREMENTS
return _TerminationManager( return _TerminationManager(
work_pool, utility_pool, action, work_pool, utility_pool, action, requirements,
_FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else packets.Kind.SERVICED_FAILURE)
_LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE)
def back_termination_manager(work_pool, utility_pool, action, subscription): def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
"""Creates a TerminationManager appropriate for back-side use. """Creates a TerminationManager appropriate for back-side use.
Args: Args:
work_pool: A thread pool in which customer work will be done. work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done. utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination. action: An action to call on operation termination.
subscription: One of interfaces.FULL, interfaces.termination_only, or subscription_kind: An interfaces.ServicedSubscription.Kind value.
interfaces.NONE.
Returns: Returns:
A TerminationManager appropriate for back-side use. A TerminationManager appropriate for back-side use.
""" """
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
requirements = _BACK_NOT_LISTENING_REQUIREMENTS
else:
requirements = _LISTENING_REQUIREMENTS
return _TerminationManager( return _TerminationManager(
work_pool, utility_pool, action, work_pool, utility_pool, action, requirements,
_BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else packets.Kind.SERVICER_FAILURE)
_LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE)

@ -91,20 +91,19 @@ class _Packetizer(object):
class _FrontPacketizer(_Packetizer): class _FrontPacketizer(_Packetizer):
"""Front-side packet-creating behavior.""" """Front-side packet-creating behavior."""
def __init__(self, name, subscription, trace_id, timeout): def __init__(self, name, subscription_kind, trace_id, timeout):
"""Constructor. """Constructor.
Args: Args:
name: The name of the operation. name: The name of the operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or subscription_kind: An interfaces.ServicedSubscription.Kind value
interfaces.NONE describing the interest the front has in packets sent describing the interest the front has in packets sent from the back.
from the back.
trace_id: A uuid.UUID identifying a set of related operations to which trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs. this operation belongs.
timeout: A length of time in seconds to allow for the entire operation. timeout: A length of time in seconds to allow for the entire operation.
""" """
self._name = name self._name = name
self._subscription = subscription self._subscription_kind = subscription_kind
self._trace_id = trace_id self._trace_id = trace_id
self._timeout = timeout self._timeout = timeout
@ -114,13 +113,13 @@ class _FrontPacketizer(_Packetizer):
return packets.FrontToBackPacket( return packets.FrontToBackPacket(
operation_id, sequence_number, operation_id, sequence_number,
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION, packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
self._name, self._subscription, self._trace_id, payload, self._name, self._subscription_kind, self._trace_id, payload,
self._timeout) self._timeout)
else: else:
return packets.FrontToBackPacket( return packets.FrontToBackPacket(
operation_id, 0, operation_id, 0,
packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT,
self._name, self._subscription, self._trace_id, payload, self._name, self._subscription_kind, self._trace_id, payload,
self._timeout) self._timeout)
def packetize_abortion(self, operation_id, sequence_number, kind): def packetize_abortion(self, operation_id, sequence_number, kind):
@ -335,8 +334,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
def front_transmission_manager( def front_transmission_manager(
lock, pool, callback, operation_id, name, subscription, trace_id, timeout, lock, pool, callback, operation_id, name, subscription_kind, trace_id,
termination_manager): timeout, termination_manager):
"""Creates a TransmissionManager appropriate for front-side use. """Creates a TransmissionManager appropriate for front-side use.
Args: Args:
@ -347,9 +346,8 @@ def front_transmission_manager(
of the operation. of the operation.
operation_id: The operation's ID. operation_id: The operation's ID.
name: The name of the operation. name: The name of the operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or subscription_kind: An interfaces.ServicedSubscription.Kind value
interfaces.NONE describing the interest the front has in packets sent describing the interest the front has in packets sent from the back.
from the back.
trace_id: A uuid.UUID identifying a set of related operations to which trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs. this operation belongs.
timeout: A length of time in seconds to allow for the entire operation. timeout: A length of time in seconds to allow for the entire operation.
@ -361,12 +359,13 @@ def front_transmission_manager(
""" """
return _TransmittingTransmissionManager( return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _FrontPacketizer( lock, pool, callback, operation_id, _FrontPacketizer(
name, subscription, trace_id, timeout), name, subscription_kind, trace_id, timeout),
termination_manager) termination_manager)
def back_transmission_manager( def back_transmission_manager(
lock, pool, callback, operation_id, termination_manager, subscription): lock, pool, callback, operation_id, termination_manager,
subscription_kind):
"""Creates a TransmissionManager appropriate for back-side use. """Creates a TransmissionManager appropriate for back-side use.
Args: Args:
@ -378,14 +377,13 @@ def back_transmission_manager(
operation_id: The operation's ID. operation_id: The operation's ID.
termination_manager: The _interfaces.TerminationManager associated with termination_manager: The _interfaces.TerminationManager associated with
this operation. this operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or subscription_kind: An interfaces.ServicedSubscription.Kind value
interfaces.NONE describing the interest the front has in packets sent from describing the interest the front has in packets sent from the back.
the back.
Returns: Returns:
A TransmissionManager appropriate for back-side use. A TransmissionManager appropriate for back-side use.
""" """
if subscription == interfaces.NONE: if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
return _EmptyTransmissionManager() return _EmptyTransmissionManager()
else: else:
return _TransmittingTransmissionManager( return _TransmittingTransmissionManager(

@ -71,10 +71,9 @@ class FrontToBackPacket(
Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE. Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE.
name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
or Kind.ENTIRE. Must be None for any other kind. or Kind.ENTIRE. Must be None for any other kind.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or subscription: An interfaces.ServicedSubscription.Kind value describing the
interfaces.NONE describing the interest the front has in packets sent from interest the front has in packets sent from the back. Must be present if
the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE. kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind.
Must be None for any other kind.
trace_id: A uuid.UUID identifying a set of related operations to which this trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None. operation belongs. May be None.
payload: A customer payload object. Must be present if kind is payload: A customer payload object. Must be present if kind is

@ -36,13 +36,14 @@ from _framework.base import interfaces
class _ServicedSubscription( class _ServicedSubscription(
collections.namedtuple('_ServicedSubscription', ['category', 'ingestor']), collections.namedtuple('_ServicedSubscription', ['kind', 'ingestor']),
interfaces.ServicedSubscription): interfaces.ServicedSubscription):
"""See interfaces.ServicedSubscription for specification.""" """See interfaces.ServicedSubscription for specification."""
_NONE_SUBSCRIPTION = _ServicedSubscription(interfaces.NONE, None) _NONE_SUBSCRIPTION = _ServicedSubscription(
interfaces.ServicedSubscription.Kind.NONE, None)
_TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription( _TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription(
interfaces.TERMINATION_ONLY, None) interfaces.ServicedSubscription.Kind.TERMINATION_ONLY, None)
def none_serviced_subscription(): def none_serviced_subscription():
@ -72,12 +73,14 @@ def full_serviced_subscription(ingestor):
"""Creates a "full" interfaces.ServicedSubscription object. """Creates a "full" interfaces.ServicedSubscription object.
Args: Args:
ingestor: A ServicedIngestor. ingestor: An interfaces.ServicedIngestor.
Returns: Returns:
A ServicedSubscription object indicating a full subscription. An interfaces.ServicedSubscription object indicating a full
subscription.
""" """
return _ServicedSubscription(interfaces.FULL, ingestor) return _ServicedSubscription(
interfaces.ServicedSubscription.Kind.FULL, ingestor)
def wait_for_idle(end): def wait_for_idle(end):

@ -94,7 +94,7 @@ class _OperationCancellableIterator(interfaces.CancellableIterator):
def cancel(self): def cancel(self):
self._operation.cancel() self._operation.cancel()
self._rendezvous.set_outcome(base_interfaces.CANCELLED) self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED)
class _OperationFuture(future.Future): class _OperationFuture(future.Future):
@ -150,15 +150,12 @@ class _OperationFuture(future.Future):
"""Indicates to this object that the operation has terminated. """Indicates to this object that the operation has terminated.
Args: Args:
operation_outcome: One of base_interfaces.COMPLETED, operation_outcome: A base_interfaces.Outcome value indicating the
base_interfaces.CANCELLED, base_interfaces.EXPIRED, outcome of the operation.
base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE
indicating the categorical outcome of the operation.
""" """
with self._condition: with self._condition:
if (self._outcome is None and if (self._outcome is None and
operation_outcome != base_interfaces.COMPLETED): operation_outcome is not base_interfaces.Outcome.COMPLETED):
self._outcome = future.raised( self._outcome = future.raised(
_control.abortion_outcome_to_exception(operation_outcome)) _control.abortion_outcome_to_exception(operation_outcome))
self._condition.notify_all() self._condition.notify_all()

@ -40,13 +40,17 @@ from _framework.foundation import stream
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-(' INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-('
_OPERATION_OUTCOME_TO_RPC_ABORTION = { _OPERATION_OUTCOME_TO_RPC_ABORTION = {
base_interfaces.CANCELLED: interfaces.CANCELLED, base_interfaces.Outcome.CANCELLED: interfaces.Abortion.CANCELLED,
base_interfaces.EXPIRED: interfaces.EXPIRED, base_interfaces.Outcome.EXPIRED: interfaces.Abortion.EXPIRED,
base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE, base_interfaces.Outcome.RECEPTION_FAILURE:
base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE, interfaces.Abortion.NETWORK_FAILURE,
base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, base_interfaces.Outcome.TRANSMISSION_FAILURE:
base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, interfaces.Abortion.NETWORK_FAILURE,
} base_interfaces.Outcome.SERVICED_FAILURE:
interfaces.Abortion.SERVICED_FAILURE,
base_interfaces.Outcome.SERVICER_FAILURE:
interfaces.Abortion.SERVICER_FAILURE,
}
def _as_operation_termination_callback(rpc_abortion_callback): def _as_operation_termination_callback(rpc_abortion_callback):
@ -59,13 +63,13 @@ def _as_operation_termination_callback(rpc_abortion_callback):
def _abortion_outcome_to_exception(abortion_outcome): def _abortion_outcome_to_exception(abortion_outcome):
if abortion_outcome == base_interfaces.CANCELLED: if abortion_outcome == base_interfaces.Outcome.CANCELLED:
return exceptions.CancellationError() return exceptions.CancellationError()
elif abortion_outcome == base_interfaces.EXPIRED: elif abortion_outcome == base_interfaces.Outcome.EXPIRED:
return exceptions.ExpirationError() return exceptions.ExpirationError()
elif abortion_outcome == base_interfaces.SERVICER_FAILURE: elif abortion_outcome == base_interfaces.Outcome.SERVICER_FAILURE:
return exceptions.ServicerError() return exceptions.ServicerError()
elif abortion_outcome == base_interfaces.SERVICED_FAILURE: elif abortion_outcome == base_interfaces.Outcome.SERVICED_FAILURE:
return exceptions.ServicedError() return exceptions.ServicedError()
else: else:
return exceptions.NetworkError() return exceptions.NetworkError()
@ -133,7 +137,7 @@ class Rendezvous(stream.Consumer):
def set_outcome(self, outcome): def set_outcome(self, outcome):
with self._condition: with self._condition:
if outcome != base_interfaces.COMPLETED: if outcome is not base_interfaces.Outcome.COMPLETED:
self._abortion = outcome self._abortion = outcome
self._condition.notify() self._condition.notify()

@ -30,6 +30,7 @@
"""Interfaces for the face layer of RPC Framework.""" """Interfaces for the face layer of RPC Framework."""
import abc import abc
import enum
# exceptions, abandonment, and future are referenced from specification in this # exceptions, abandonment, and future are referenced from specification in this
# module. # module.
@ -58,14 +59,15 @@ class CancellableIterator(object):
raise NotImplementedError() raise NotImplementedError()
# Constants that categorize RPC abortion. @enum.unique
# TODO(nathaniel): Learn and use Python's enum library for this de facto class Abortion(enum.Enum):
# enumerated type """Categories of RPC abortion."""
CANCELLED = 'abortion: cancelled'
EXPIRED = 'abortion: expired' CANCELLED = 'cancelled'
NETWORK_FAILURE = 'abortion: network failure' EXPIRED = 'expired'
SERVICED_FAILURE = 'abortion: serviced failure' NETWORK_FAILURE = 'network failure'
SERVICER_FAILURE = 'abortion: servicer failure' SERVICED_FAILURE = 'serviced failure'
SERVICER_FAILURE = 'servicer failure'
class RpcContext(object): class RpcContext(object):
@ -93,9 +95,8 @@ class RpcContext(object):
"""Registers a callback to be called if the RPC is aborted. """Registers a callback to be called if the RPC is aborted.
Args: Args:
abortion_callback: A callable to be called and passed one of CANCELLED, abortion_callback: A callable to be called and passed an Abortion value
EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the in the event of RPC abortion.
event of RPC abortion.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -474,9 +475,8 @@ class Stub(object):
request: The request value for the RPC. request: The request value for the RPC.
response_callback: A callback to be called to accept the response value response_callback: A callback to be called to accept the response value
of the RPC. of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED, abortion_callback: A callback to be called and passed an Abortion value
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC in the event of RPC abortion.
abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
Returns: Returns:
@ -494,9 +494,8 @@ class Stub(object):
request: The request value for the RPC. request: The request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the response response_consumer: A stream.Consumer to be called to accept the response
values of the RPC. values of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED, abortion_callback: A callback to be called and passed an Abortion value
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC in the event of RPC abortion.
abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
Returns: Returns:
@ -513,9 +512,8 @@ class Stub(object):
name: The RPC method name. name: The RPC method name.
response_callback: A callback to be called to accept the response value response_callback: A callback to be called to accept the response value
of the RPC. of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED, abortion_callback: A callback to be called and passed an Abortion value
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC in the event of RPC abortion.
abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
Returns: Returns:
@ -533,9 +531,8 @@ class Stub(object):
name: The RPC method name. name: The RPC method name.
response_consumer: A stream.Consumer to be called to accept the response response_consumer: A stream.Consumer to be called to accept the response
values of the RPC. values of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED, abortion_callback: A callback to be called and passed an Abortion value
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC in the event of RPC abortion.
abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
Returns: Returns:

@ -176,7 +176,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback.complete, callback.abort, _TIMEOUT) name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion()) self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredUnaryRequestStreamResponse(self): def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -190,7 +190,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback, callback.abort, _TIMEOUT) name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion()) self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredStreamRequestUnaryResponse(self): def testExpiredStreamRequestUnaryResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -202,7 +202,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, callback.complete, callback.abort, _TIMEOUT) name, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion()) self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredStreamRequestStreamResponse(self): def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -217,7 +217,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.consume(request) request_consumer.consume(request)
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion()) self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testFailedUnaryRequestUnaryResponse(self): def testFailedUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -231,7 +231,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback.complete, callback.abort, _TIMEOUT) name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedUnaryRequestStreamResponse(self): def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -245,7 +245,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback, callback.abort, _TIMEOUT) name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestUnaryResponse(self): def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -262,7 +262,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.terminate() request_consumer.terminate()
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestStreamResponse(self): def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -279,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.terminate() request_consumer.terminate()
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testParallelInvocations(self): def testParallelInvocations(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -321,7 +321,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel() call.cancel()
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion()) self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledUnaryRequestStreamResponse(self): def testCancelledUnaryRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -335,7 +335,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel() call.cancel()
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion()) self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledStreamRequestUnaryResponse(self): def testCancelledStreamRequestUnaryResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -351,7 +351,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel() call.cancel()
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion()) self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledStreamRequestStreamResponse(self): def testCancelledStreamRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -364,4 +364,4 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel() call.cancel()
callback.block_until_terminated() callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion()) self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())

Loading…
Cancel
Save