Merge pull request #1055 from nathanielmanistaatgoogle/python-refactoring

Python refactoring
pull/1058/head
Masood Malekghassemi 10 years ago
commit afdc24ae2b
  1. 5
      src/python/src/grpc/framework/base/packets/_cancellation.py
  2. 8
      src/python/src/grpc/framework/base/packets/_context.py
  3. 22
      src/python/src/grpc/framework/base/packets/_emission.py
  4. 16
      src/python/src/grpc/framework/base/packets/_ends.py
  5. 6
      src/python/src/grpc/framework/base/packets/_expiration.py
  6. 40
      src/python/src/grpc/framework/base/packets/_ingestion.py
  7. 8
      src/python/src/grpc/framework/base/packets/_interfaces.py
  8. 55
      src/python/src/grpc/framework/base/packets/_reception.py
  9. 48
      src/python/src/grpc/framework/base/packets/_termination.py
  10. 63
      src/python/src/grpc/framework/base/packets/_transmission.py

@ -29,6 +29,7 @@
"""State and behavior for operation cancellation.""" """State and behavior for operation cancellation."""
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets from grpc.framework.base.packets import packets
@ -58,7 +59,7 @@ class CancellationManager(_interfaces.CancellationManager):
def cancel(self): def cancel(self):
"""See _interfaces.CancellationManager.cancel for specification.""" """See _interfaces.CancellationManager.cancel for specification."""
with self._lock: with self._lock:
self._termination_manager.abort(packets.Kind.CANCELLATION) self._termination_manager.abort(base_interfaces.Outcome.CANCELLED)
self._transmission_manager.abort(packets.Kind.CANCELLATION) self._transmission_manager.abort(base_interfaces.Outcome.CANCELLED)
self._ingestion_manager.abort() self._ingestion_manager.abort()
self._expiration_manager.abort() self._expiration_manager.abort()

@ -31,10 +31,9 @@
import time import time
# _interfaces and packets are referenced from specification in this module. # _interfaces is referenced from specification in this module.
from grpc.framework.base import interfaces as base_interfaces from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import
from grpc.framework.base.packets import packets # pylint: disable=unused-import
class OperationContext(base_interfaces.OperationContext): class OperationContext(base_interfaces.OperationContext):
@ -48,8 +47,9 @@ class OperationContext(base_interfaces.OperationContext):
Args: Args:
lock: The operation-wide lock. lock: The operation-wide lock.
operation_id: An object identifying the operation. operation_id: An object identifying the operation.
local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or local_failure: Whichever one of base_interfaces.Outcome.SERVICED_FAILURE
packets.Kind.SERVICER_FAILURE describes local failure of customer code. or base_interfaces.Outcome.SERVICER_FAILURE describes local failure of
customer code.
termination_manager: The _interfaces.TerminationManager for the operation. termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the transmission_manager: The _interfaces.TransmissionManager for the
operation. operation.

@ -29,29 +29,29 @@
"""State and behavior for handling emitted values.""" """State and behavior for handling emitted values."""
# packets is referenced from specifications in this module. from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets # pylint: disable=unused-import
class _EmissionManager(_interfaces.EmissionManager): class _EmissionManager(_interfaces.EmissionManager):
"""An implementation of _interfaces.EmissionManager.""" """An implementation of _interfaces.EmissionManager."""
def __init__( def __init__(
self, lock, failure_kind, termination_manager, transmission_manager): self, lock, failure_outcome, termination_manager, transmission_manager):
"""Constructor. """Constructor.
Args: Args:
lock: The operation-wide lock. lock: The operation-wide lock.
failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or failure_outcome: Whichever one of
packets.Kind.SERVICER_FAILURE describes this object's methods being base_interfaces.Outcome.SERVICED_FAILURE or
called inappropriately by customer code. base_interfaces.Outcome.SERVICER_FAILURE describes this object's
methods being called inappropriately by customer code.
termination_manager: The _interfaces.TerminationManager for the operation. termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the transmission_manager: The _interfaces.TransmissionManager for the
operation. operation.
""" """
self._lock = lock self._lock = lock
self._failure_kind = failure_kind self._failure_outcome = failure_outcome
self._termination_manager = termination_manager self._termination_manager = termination_manager
self._transmission_manager = transmission_manager self._transmission_manager = transmission_manager
self._ingestion_manager = None self._ingestion_manager = None
@ -65,8 +65,8 @@ class _EmissionManager(_interfaces.EmissionManager):
self._expiration_manager = expiration_manager self._expiration_manager = expiration_manager
def _abort(self): def _abort(self):
self._termination_manager.abort(self._failure_kind) self._termination_manager.abort(self._failure_outcome)
self._transmission_manager.abort(self._failure_kind) self._transmission_manager.abort(self._failure_outcome)
self._ingestion_manager.abort() self._ingestion_manager.abort()
self._expiration_manager.abort() self._expiration_manager.abort()
@ -106,7 +106,7 @@ def front_emission_manager(lock, termination_manager, transmission_manager):
An _interfaces.EmissionManager appropriate for front-side use. An _interfaces.EmissionManager appropriate for front-side use.
""" """
return _EmissionManager( return _EmissionManager(
lock, packets.Kind.SERVICED_FAILURE, termination_manager, lock, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager,
transmission_manager) transmission_manager)
@ -122,5 +122,5 @@ def back_emission_manager(lock, termination_manager, transmission_manager):
An _interfaces.EmissionManager appropriate for back-side use. An _interfaces.EmissionManager appropriate for back-side use.
""" """
return _EmissionManager( return _EmissionManager(
lock, packets.Kind.SERVICER_FAILURE, termination_manager, lock, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager,
transmission_manager) transmission_manager)

@ -50,16 +50,6 @@ from grpc.framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' _IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
_OPERATION_OUTCOMES = (
base_interfaces.Outcome.COMPLETED,
base_interfaces.Outcome.CANCELLED,
base_interfaces.Outcome.EXPIRED,
base_interfaces.Outcome.RECEPTION_FAILURE,
base_interfaces.Outcome.TRANSMISSION_FAILURE,
base_interfaces.Outcome.SERVICER_FAILURE,
base_interfaces.Outcome.SERVICED_FAILURE,
)
class _EasyOperation(base_interfaces.Operation): class _EasyOperation(base_interfaces.Operation):
"""A trivial implementation of base_interfaces.Operation.""" """A trivial implementation of base_interfaces.Operation."""
@ -98,7 +88,7 @@ class _Endlette(object):
# indicates an in-progress fire-and-forget operation for which the customer # indicates an in-progress fire-and-forget operation for which the customer
# has chosen to ignore results. # has chosen to ignore results.
self._operations = {} self._operations = {}
self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES} self._stats = {outcome: 0 for outcome in base_interfaces.Outcome}
self._idle_actions = [] self._idle_actions = []
def terminal_action(self, operation_id): def terminal_action(self, operation_id):
@ -198,7 +188,7 @@ def _front_operate(
lock, transmission_pool, callback, operation_id, name, lock, transmission_pool, callback, operation_id, name,
subscription.kind, trace_id, timeout, termination_manager) subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext( operation_context = _context.OperationContext(
lock, operation_id, packets.Kind.SERVICED_FAILURE, lock, operation_id, base_interfaces.Outcome.SERVICED_FAILURE,
termination_manager, transmission_manager) termination_manager, transmission_manager)
emission_manager = _emission.front_emission_manager( emission_manager = _emission.front_emission_manager(
lock, termination_manager, transmission_manager) lock, termination_manager, transmission_manager)
@ -327,7 +317,7 @@ def _back_operate(
lock, transmission_pool, callback, ticket.operation_id, lock, transmission_pool, callback, ticket.operation_id,
termination_manager, ticket.subscription) termination_manager, ticket.subscription)
operation_context = _context.OperationContext( operation_context = _context.OperationContext(
lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE, lock, ticket.operation_id, base_interfaces.Outcome.SERVICER_FAILURE,
termination_manager, transmission_manager) termination_manager, transmission_manager)
emission_manager = _emission.back_emission_manager( emission_manager = _emission.back_emission_manager(
lock, termination_manager, transmission_manager) lock, termination_manager, transmission_manager)

@ -31,8 +31,8 @@
import time import time
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets
from grpc.framework.foundation import later from grpc.framework.foundation import later
@ -73,8 +73,8 @@ class _ExpirationManager(_interfaces.ExpirationManager):
with self._lock: with self._lock:
if self._future is not None and index == self._index: if self._future is not None and index == self._index:
self._future = None self._future = None
self._termination_manager.abort(packets.Kind.EXPIRATION) self._termination_manager.abort(base_interfaces.Outcome.EXPIRED)
self._transmission_manager.abort(packets.Kind.EXPIRATION) self._transmission_manager.abort(base_interfaces.Outcome.EXPIRED)
self._ingestion_manager.abort() self._ingestion_manager.abort()
def start(self): def start(self):

@ -206,7 +206,7 @@ class _IngestionManager(_interfaces.IngestionManager):
"""An implementation of _interfaces.IngestionManager.""" """An implementation of _interfaces.IngestionManager."""
def __init__( def __init__(
self, lock, pool, consumer_creator, failure_kind, termination_manager, self, lock, pool, consumer_creator, failure_outcome, termination_manager,
transmission_manager): transmission_manager):
"""Constructor. """Constructor.
@ -216,8 +216,10 @@ class _IngestionManager(_interfaces.IngestionManager):
consumer_creator: A _ConsumerCreator wrapping the portion of customer code consumer_creator: A _ConsumerCreator wrapping the portion of customer code
that when called returns the stream.Consumer with which the customer that when called returns the stream.Consumer with which the customer
code will ingest payload values. code will ingest payload values.
failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or failure_outcome: Whichever one of
packets.Kind.SERVICER_FAILURE describes local failure of customer code. interfaces.Outcome.SERVICED_FAILURE or
interfaces.Outcome.SERVICER_FAILURE describes local failure of
customer code.
termination_manager: The _interfaces.TerminationManager for the operation. termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the transmission_manager: The _interfaces.TransmissionManager for the
operation. operation.
@ -225,7 +227,7 @@ class _IngestionManager(_interfaces.IngestionManager):
self._lock = lock self._lock = lock
self._pool = pool self._pool = pool
self._consumer_creator = consumer_creator self._consumer_creator = consumer_creator
self._failure_kind = failure_kind self._failure_outcome = failure_outcome
self._termination_manager = termination_manager self._termination_manager = termination_manager
self._transmission_manager = transmission_manager self._transmission_manager = transmission_manager
self._expiration_manager = None self._expiration_manager = None
@ -299,12 +301,12 @@ class _IngestionManager(_interfaces.IngestionManager):
else: else:
with self._lock: with self._lock:
if self._pending_ingestion is not None: if self._pending_ingestion is not None:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
self._processing = False self._processing = False
return return
else: else:
with self._lock: with self._lock:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
self._processing = False self._processing = False
return return
@ -316,16 +318,16 @@ class _IngestionManager(_interfaces.IngestionManager):
_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement) _CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement)
if consumer_creation_outcome.return_value is None: if consumer_creation_outcome.return_value is None:
with self._lock: with self._lock:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
self._processing = False self._processing = False
elif consumer_creation_outcome.return_value.remote_error: elif consumer_creation_outcome.return_value.remote_error:
with self._lock: with self._lock:
self._abort_and_notify(packets.Kind.RECEPTION_FAILURE) self._abort_and_notify(interfaces.Outcome.RECEPTION_FAILURE)
self._processing = False self._processing = False
elif consumer_creation_outcome.return_value.abandoned: elif consumer_creation_outcome.return_value.abandoned:
with self._lock: with self._lock:
if self._pending_ingestion is not None: if self._pending_ingestion is not None:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
self._processing = False self._processing = False
else: else:
wrapped_ingestion_consumer = _WrappedConsumer( wrapped_ingestion_consumer = _WrappedConsumer(
@ -346,7 +348,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def consume(self, payload): def consume(self, payload):
if self._ingestion_complete: if self._ingestion_complete:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
elif self._pending_ingestion is not None: elif self._pending_ingestion is not None:
if self._processing: if self._processing:
self._pending_ingestion.append(payload) self._pending_ingestion.append(payload)
@ -359,7 +361,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def terminate(self): def terminate(self):
if self._ingestion_complete: if self._ingestion_complete:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
else: else:
self._ingestion_complete = True self._ingestion_complete = True
if self._pending_ingestion is not None and not self._processing: if self._pending_ingestion is not None and not self._processing:
@ -371,7 +373,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def consume_and_terminate(self, payload): def consume_and_terminate(self, payload):
if self._ingestion_complete: if self._ingestion_complete:
self._abort_and_notify(self._failure_kind) self._abort_and_notify(self._failure_outcome)
else: else:
self._ingestion_complete = True self._ingestion_complete = True
if self._pending_ingestion is not None: if self._pending_ingestion is not None:
@ -397,19 +399,20 @@ def front_ingestion_manager(
Args: Args:
lock: The operation-wide lock. lock: The operation-wide lock.
pool: A thread pool in which to execute customer code. pool: A thread pool in which to execute customer code.
subscription: A base_interfaces.ServicedSubscription indicating the subscription: A interfaces.ServicedSubscription indicating the
customer's interest in the results of the operation. customer's interest in the results of the operation.
termination_manager: The _interfaces.TerminationManager for the operation. termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the transmission_manager: The _interfaces.TransmissionManager for the
operation. operation.
operation_context: A base_interfaces.OperationContext for the operation. operation_context: A interfaces.OperationContext for the operation.
Returns: Returns:
An IngestionManager appropriate for front-side use. An IngestionManager appropriate for front-side use.
""" """
ingestion_manager = _IngestionManager( ingestion_manager = _IngestionManager(
lock, pool, _FrontConsumerCreator(subscription, operation_context), lock, pool, _FrontConsumerCreator(subscription, operation_context),
packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager) interfaces.Outcome.SERVICED_FAILURE, termination_manager,
transmission_manager)
ingestion_manager.start(None) ingestion_manager.start(None)
return ingestion_manager return ingestion_manager
@ -422,11 +425,11 @@ def back_ingestion_manager(
Args: Args:
lock: The operation-wide lock. lock: The operation-wide lock.
pool: A thread pool in which to execute customer code. pool: A thread pool in which to execute customer code.
servicer: A base_interfaces.Servicer for servicing the operation. servicer: A interfaces.Servicer for servicing the operation.
termination_manager: The _interfaces.TerminationManager for the operation. termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the transmission_manager: The _interfaces.TransmissionManager for the
operation. operation.
operation_context: A base_interfaces.OperationContext for the operation. operation_context: A interfaces.OperationContext for the operation.
emission_consumer: The _interfaces.EmissionConsumer for the operation. emission_consumer: The _interfaces.EmissionConsumer for the operation.
Returns: Returns:
@ -435,5 +438,6 @@ def back_ingestion_manager(
ingestion_manager = _IngestionManager( ingestion_manager = _IngestionManager(
lock, pool, _BackConsumerCreator( lock, pool, _BackConsumerCreator(
servicer, operation_context, emission_consumer), servicer, operation_context, emission_consumer),
packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager) interfaces.Outcome.SERVICER_FAILURE, termination_manager,
transmission_manager)
return ingestion_manager return ingestion_manager

@ -83,11 +83,11 @@ class TerminationManager(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def abort(self, kind): def abort(self, outcome):
"""Indicates that the operation must abort for the indicated reason. """Indicates that the operation must abort for the indicated reason.
Args: Args:
kind: A value of packets.Kind indicating operation abortion. outcome: A base_interfaces.Outcome indicating operation abortion.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -109,11 +109,11 @@ class TransmissionManager(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def abort(self, kind): def abort(self, outcome):
"""Indicates that the operation has aborted for the indicated reason. """Indicates that the operation has aborted for the indicated reason.
Args: Args:
kind: A value of packets.Kind indicating operation abortion. outcome: A base_interfaces.Outcome indicating operation abortion.
""" """
raise NotImplementedError() raise NotImplementedError()

@ -31,6 +31,7 @@
import abc import abc
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets from grpc.framework.base.packets import packets
@ -72,11 +73,11 @@ class _Receiver(object):
def _abort( def _abort(
category, termination_manager, transmission_manager, ingestion_manager, outcome, termination_manager, transmission_manager, ingestion_manager,
expiration_manager): expiration_manager):
"""Indicates abortion with the given category to the given managers.""" """Indicates abortion with the given outcome to the given managers."""
termination_manager.abort(category) termination_manager.abort(outcome)
transmission_manager.abort(category) transmission_manager.abort(outcome)
ingestion_manager.abort() ingestion_manager.abort()
expiration_manager.abort() expiration_manager.abort()
@ -88,9 +89,9 @@ def _abort_if_abortive(
Args: Args:
packet: A just-arrived packet. packet: A just-arrived packet.
abortive: A callable that takes a packet and returns an operation category abortive: A callable that takes a packet and returns a
indicating that the operation should be aborted or None indicating that base_interfaces.Outcome indicating that the operation should be aborted
the operation should not be aborted. or None indicating that the operation should not be aborted.
termination_manager: The operation's _interfaces.TerminationManager. termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager. transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager. ingestion_manager: The operation's _interfaces.IngestionManager.
@ -99,12 +100,12 @@ def _abort_if_abortive(
Returns: Returns:
True if the operation was aborted; False otherwise. True if the operation was aborted; False otherwise.
""" """
abort_category = abortive(packet) abortion_outcome = abortive(packet)
if abort_category is None: if abortion_outcome is None:
return False return False
else: else:
_abort( _abort(
abort_category, termination_manager, transmission_manager, abortion_outcome, termination_manager, transmission_manager,
ingestion_manager, expiration_manager) ingestion_manager, expiration_manager)
return True return True
@ -114,8 +115,8 @@ def _reception_failure(
expiration_manager): expiration_manager):
"""Aborts the operation with an indication of reception failure.""" """Aborts the operation with an indication of reception failure."""
_abort( _abort(
packets.Kind.RECEPTION_FAILURE, termination_manager, transmission_manager, base_interfaces.Outcome.RECEPTION_FAILURE, termination_manager,
ingestion_manager, expiration_manager) transmission_manager, ingestion_manager, expiration_manager)
class _BackReceiver(_Receiver): class _BackReceiver(_Receiver):
@ -147,23 +148,22 @@ class _BackReceiver(_Receiver):
packet: A just-arrived packet. packet: A just-arrived packet.
Returns: Returns:
One of packets.Kind.CANCELLATION, packets.Kind.SERVICED_FAILURE, or A base_interfaces.Outcome value describing operation abortion if the
packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive packet is abortive or None if the packet is not abortive.
and how, or None, indicating that the packet is not abortive.
""" """
if packet.kind is packets.Kind.CANCELLATION: if packet.kind is packets.Kind.CANCELLATION:
return packets.Kind.CANCELLATION return base_interfaces.Outcome.CANCELLED
elif packet.kind is packets.Kind.EXPIRATION: elif packet.kind is packets.Kind.EXPIRATION:
return packets.Kind.EXPIRATION return base_interfaces.Outcome.EXPIRED
elif packet.kind is packets.Kind.SERVICED_FAILURE: elif packet.kind is packets.Kind.SERVICED_FAILURE:
return packets.Kind.SERVICED_FAILURE return base_interfaces.Outcome.SERVICED_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE: elif packet.kind is packets.Kind.RECEPTION_FAILURE:
return packets.Kind.SERVICED_FAILURE return base_interfaces.Outcome.SERVICED_FAILURE
elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and
self._first_packet_seen): self._first_packet_seen):
return packets.Kind.RECEPTION_FAILURE return base_interfaces.Outcome.RECEPTION_FAILURE
elif self._last_packet_seen: elif self._last_packet_seen:
return packets.Kind.RECEPTION_FAILURE return base_interfaces.Outcome.RECEPTION_FAILURE
else: else:
return None return None
@ -236,18 +236,17 @@ class _FrontReceiver(_Receiver):
packet: A just-arrived packet. packet: A just-arrived packet.
Returns: Returns:
One of packets.Kind.EXPIRATION, packets.Kind.SERVICER_FAILURE, or A base_interfaces.Outcome value describing operation abortion if the
packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive packet is abortive or None if the packet is not abortive.
and how, or None, indicating that the packet is not abortive.
""" """
if packet.kind is packets.Kind.EXPIRATION: if packet.kind is packets.Kind.EXPIRATION:
return packets.Kind.EXPIRATION return base_interfaces.Outcome.EXPIRED
elif packet.kind is packets.Kind.SERVICER_FAILURE: elif packet.kind is packets.Kind.SERVICER_FAILURE:
return packets.Kind.SERVICER_FAILURE return base_interfaces.Outcome.SERVICER_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE: elif packet.kind is packets.Kind.RECEPTION_FAILURE:
return packets.Kind.SERVICER_FAILURE return base_interfaces.Outcome.SERVICER_FAILURE
elif self._last_packet_seen: elif self._last_packet_seen:
return packets.Kind.RECEPTION_FAILURE return base_interfaces.Outcome.RECEPTION_FAILURE
else: else:
return None return None

@ -34,21 +34,10 @@ import enum
from grpc.framework.base import interfaces from grpc.framework.base import interfaces
from grpc.framework.base.packets import _constants from grpc.framework.base.packets import _constants
from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets
from grpc.framework.foundation import callable_util from grpc.framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!' _CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
_KINDS_TO_OUTCOMES = {
packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED,
packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED,
packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED,
packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE,
packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE,
packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE,
}
@enum.unique @enum.unique
class _Requirement(enum.Enum): class _Requirement(enum.Enum):
@ -78,8 +67,8 @@ class _TerminationManager(_interfaces.TerminationManager):
action: An action to call on operation termination. action: An action to call on operation termination.
requirements: A combination of _Requirement values identifying what requirements: A combination of _Requirement values identifying what
must finish for the operation to be considered completed. must finish for the operation to be considered completed.
local_failure: A packets.Kind specifying what constitutes local failure of local_failure: An interfaces.Outcome specifying what constitutes local
customer work. failure of customer work.
""" """
self._work_pool = work_pool self._work_pool = work_pool
self._utility_pool = utility_pool self._utility_pool = utility_pool
@ -89,27 +78,23 @@ class _TerminationManager(_interfaces.TerminationManager):
self._expiration_manager = None self._expiration_manager = None
self._outstanding_requirements = set(requirements) self._outstanding_requirements = set(requirements)
self._kind = None self._outcome = None
self._callbacks = [] self._callbacks = []
def set_expiration_manager(self, expiration_manager): def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager self._expiration_manager = expiration_manager
def _terminate(self, kind): def _terminate(self, outcome):
"""Terminates the operation. """Terminates the operation.
Args: Args:
kind: One of packets.Kind.COMPLETION, packets.Kind.CANCELLATION, outcome: An interfaces.Outcome describing the outcome of the operation.
packets.Kind.EXPIRATION, packets.Kind.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
packets.Kind.SERVICED_FAILURE.
""" """
self._expiration_manager.abort() self._expiration_manager.abort()
self._outstanding_requirements = None self._outstanding_requirements = None
callbacks = list(self._callbacks) callbacks = list(self._callbacks)
self._callbacks = None self._callbacks = None
self._kind = kind self._outcome = outcome
outcome = _KINDS_TO_OUTCOMES[kind]
act = callable_util.with_exceptions_logged( act = callable_util.with_exceptions_logged(
self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
@ -122,7 +107,7 @@ class _TerminationManager(_interfaces.TerminationManager):
callback_outcome = callable_util.call_logging_exceptions( callback_outcome = callable_util.call_logging_exceptions(
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome) callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome)
if callback_outcome.exception is not None: if callback_outcome.exception is not None:
outcome = _KINDS_TO_OUTCOMES[self._local_failure] outcome = self._local_failure
break break
self._utility_pool.submit(act, outcome) self._utility_pool.submit(act, outcome)
@ -141,8 +126,7 @@ class _TerminationManager(_interfaces.TerminationManager):
if self._outstanding_requirements is None: if self._outstanding_requirements is None:
self._work_pool.submit( self._work_pool.submit(
callable_util.with_exceptions_logged( callable_util.with_exceptions_logged(
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), self._outcome)
_KINDS_TO_OUTCOMES[self._kind])
else: else:
self._callbacks.append(callback) self._callbacks.append(callback)
@ -151,28 +135,28 @@ class _TerminationManager(_interfaces.TerminationManager):
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_Requirement.EMISSION) self._outstanding_requirements.discard(_Requirement.EMISSION)
if not self._outstanding_requirements: if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION) self._terminate(interfaces.Outcome.COMPLETED)
def transmission_complete(self): def transmission_complete(self):
"""See superclass method for specification.""" """See superclass method for specification."""
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_Requirement.TRANSMISSION) self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
if not self._outstanding_requirements: if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION) self._terminate(interfaces.Outcome.COMPLETED)
def ingestion_complete(self): def ingestion_complete(self):
"""See superclass method for specification.""" """See superclass method for specification."""
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_Requirement.INGESTION) self._outstanding_requirements.discard(_Requirement.INGESTION)
if not self._outstanding_requirements: if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION) self._terminate(interfaces.Outcome.COMPLETED)
def abort(self, kind): def abort(self, outcome):
"""See _interfaces.TerminationManager.abort for specification.""" """See _interfaces.TerminationManager.abort for specification."""
if kind == self._local_failure: if outcome is self._local_failure:
self._has_failed_locally = True self._has_failed_locally = True
if self._outstanding_requirements is not None: if self._outstanding_requirements is not None:
self._terminate(kind) self._terminate(outcome)
def front_termination_manager( def front_termination_manager(
@ -195,7 +179,7 @@ def front_termination_manager(
return _TerminationManager( return _TerminationManager(
work_pool, utility_pool, action, requirements, work_pool, utility_pool, action, requirements,
packets.Kind.SERVICED_FAILURE) interfaces.Outcome.SERVICED_FAILURE)
def back_termination_manager(work_pool, utility_pool, action, subscription_kind): def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
@ -217,4 +201,4 @@ def back_termination_manager(work_pool, utility_pool, action, subscription_kind)
return _TerminationManager( return _TerminationManager(
work_pool, utility_pool, action, requirements, work_pool, utility_pool, action, requirements,
packets.Kind.SERVICER_FAILURE) interfaces.Outcome.SERVICER_FAILURE)

@ -39,14 +39,24 @@ from grpc.framework.foundation import callable_util
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = ( _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = (
packets.Kind.SERVICER_FAILURE, interfaces.Outcome.SERVICER_FAILURE,
) )
_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = ( _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = (
packets.Kind.CANCELLATION, interfaces.Outcome.CANCELLED,
packets.Kind.SERVICED_FAILURE, interfaces.Outcome.SERVICED_FAILURE,
) )
_ABORTION_OUTCOME_TO_PACKET_KIND = {
interfaces.Outcome.CANCELLED: packets.Kind.CANCELLATION,
interfaces.Outcome.EXPIRED: packets.Kind.EXPIRATION,
interfaces.Outcome.RECEPTION_FAILURE: packets.Kind.RECEPTION_FAILURE,
interfaces.Outcome.TRANSMISSION_FAILURE: packets.Kind.TRANSMISSION_FAILURE,
interfaces.Outcome.SERVICED_FAILURE: packets.Kind.SERVICED_FAILURE,
interfaces.Outcome.SERVICER_FAILURE: packets.Kind.SERVICER_FAILURE,
}
class _Packetizer(object): class _Packetizer(object):
"""Common specification of different packet-creating behavior.""" """Common specification of different packet-creating behavior."""
@ -72,18 +82,18 @@ class _Packetizer(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def packetize_abortion(self, operation_id, sequence_number, kind): def packetize_abortion(self, operation_id, sequence_number, outcome):
"""Creates a packet indicating that the operation is aborted. """Creates a packet indicating that the operation is aborted.
Args: Args:
operation_id: The operation ID for the current operation. operation_id: The operation ID for the current operation.
sequence_number: A sequence number for the packet. sequence_number: A sequence number for the packet.
kind: One of the values of packets.Kind indicating operational abortion. outcome: An interfaces.Outcome value describing the operation abortion.
Returns: Returns:
An object of an appropriate type suitable for transmission to the other An object of an appropriate type suitable for transmission to the other
side of the operation, or None if transmission is not appropriate for side of the operation, or None if transmission is not appropriate for
the given kind. the given outcome.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -122,11 +132,12 @@ class _FrontPacketizer(_Packetizer):
self._name, self._subscription_kind, self._trace_id, payload, self._name, self._subscription_kind, self._trace_id, payload,
self._timeout) self._timeout)
def packetize_abortion(self, operation_id, sequence_number, kind): def packetize_abortion(self, operation_id, sequence_number, outcome):
"""See _Packetizer.packetize_abortion for specification.""" """See _Packetizer.packetize_abortion for specification."""
if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS: if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES:
return None return None
else: else:
kind = _ABORTION_OUTCOME_TO_PACKET_KIND[outcome]
return packets.FrontToBackPacket( return packets.FrontToBackPacket(
operation_id, sequence_number, kind, None, None, None, None, None) operation_id, sequence_number, kind, None, None, None, None, None)
@ -141,11 +152,12 @@ class _BackPacketizer(_Packetizer):
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION, packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
payload) payload)
def packetize_abortion(self, operation_id, sequence_number, kind): def packetize_abortion(self, operation_id, sequence_number, outcome):
"""See _Packetizer.packetize_abortion for specification.""" """See _Packetizer.packetize_abortion for specification."""
if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS: if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES:
return None return None
else: else:
kind = _ABORTION_OUTCOME_TO_PACKET_KIND[outcome]
return packets.BackToFrontPacket( return packets.BackToFrontPacket(
operation_id, sequence_number, kind, None) operation_id, sequence_number, kind, None)
@ -178,7 +190,7 @@ class _EmptyTransmissionManager(TransmissionManager):
def inmit(self, emission, complete): def inmit(self, emission, complete):
"""See _interfaces.TransmissionManager.inmit for specification.""" """See _interfaces.TransmissionManager.inmit for specification."""
def abort(self, category): def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification.""" """See _interfaces.TransmissionManager.abort for specification."""
@ -212,7 +224,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
self._emissions = [] self._emissions = []
self._emission_complete = False self._emission_complete = False
self._kind = None self._outcome = None
self._lowest_unused_sequence_number = 0 self._lowest_unused_sequence_number = 0
self._transmitting = False self._transmitting = False
@ -239,17 +251,17 @@ class _TransmittingTransmissionManager(TransmissionManager):
return self._packetizer.packetize( return self._packetizer.packetize(
self._operation_id, sequence_number, emission, complete) self._operation_id, sequence_number, emission, complete)
def _abortive_response_packet(self, kind): def _abortive_response_packet(self, outcome):
"""Creates a packet indicating operation abortion. """Creates a packet indicating operation abortion.
Args: Args:
kind: One of the values of packets.Kind indicating operational abortion. outcome: An interfaces.Outcome value describing operation abortion.
Returns: Returns:
A packet indicating operation abortion. A packet indicating operation abortion.
""" """
packet = self._packetizer.packetize_abortion( packet = self._packetizer.packetize_abortion(
self._operation_id, self._lowest_unused_sequence_number, kind) self._operation_id, self._lowest_unused_sequence_number, outcome)
if packet is None: if packet is None:
return None return None
else: else:
@ -267,7 +279,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
""" """
if self._emissions is None: if self._emissions is None:
return False, None return False, None
elif self._kind is None: elif self._outcome is None:
if self._emissions: if self._emissions:
payload = self._emissions.pop(0) payload = self._emissions.pop(0)
complete = self._emission_complete and not self._emissions complete = self._emission_complete and not self._emissions
@ -278,7 +290,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
else: else:
return self._emission_complete, None return self._emission_complete, None
else: else:
packet = self._abortive_response_packet(self._kind) packet = self._abortive_response_packet(self._outcome)
self._emissions = None self._emissions = None
return False, None if packet is None else packet return False, None if packet is None else packet
@ -303,7 +315,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
else: else:
with self._lock: with self._lock:
self._emissions = None self._emissions = None
self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE) self._termination_manager.abort(
interfaces.Outcome.TRANSMISSION_FAILURE)
self._ingestion_manager.abort() self._ingestion_manager.abort()
self._expiration_manager.abort() self._expiration_manager.abort()
self._transmitting = False self._transmitting = False
@ -315,19 +328,19 @@ class _TransmittingTransmissionManager(TransmissionManager):
def inmit(self, emission, complete): def inmit(self, emission, complete):
"""See _interfaces.TransmissionManager.inmit for specification.""" """See _interfaces.TransmissionManager.inmit for specification."""
if self._emissions is not None and self._kind is None: if self._emissions is not None and self._outcome is None:
self._emission_complete = complete self._emission_complete = complete
if self._transmitting: if self._transmitting:
self._emissions.append(emission) self._emissions.append(emission)
else: else:
self._transmit(self._lead_packet(emission, complete)) self._transmit(self._lead_packet(emission, complete))
def abort(self, kind): def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification.""" """See _interfaces.TransmissionManager.abort for specification."""
if self._emissions is not None and self._kind is None: if self._emissions is not None and self._outcome is None:
self._kind = kind self._outcome = outcome
if not self._transmitting: if not self._transmitting:
packet = self._abortive_response_packet(kind) packet = self._abortive_response_packet(outcome)
self._emissions = None self._emissions = None
if packet is not None: if packet is not None:
self._transmit(packet) self._transmit(packet)

Loading…
Cancel
Save