Change remaining "packet" names to "ticket"

This renaming has been incrementally happening for the last several
weeks and this change finishes it.
pull/1073/head
Nathaniel Manista 10 years ago
parent 492dfdb950
commit de5490c15c
  1. 56
      src/python/src/grpc/_adapter/_links_test.py
  2. 14
      src/python/src/grpc/_adapter/_lonely_rear_link_test.py
  3. 32
      src/python/src/grpc/_adapter/fore.py
  4. 32
      src/python/src/grpc/_adapter/rear.py
  5. 12
      src/python/src/grpc/framework/base/_ends.py
  6. 8
      src/python/src/grpc/framework/base/_interfaces.py
  7. 208
      src/python/src/grpc/framework/base/_reception.py
  8. 190
      src/python/src/grpc/framework/base/_transmission.py
  9. 2
      src/python/src/grpc/framework/base/implementations.py
  10. 2
      src/python/src/grpc/framework/base/in_memory.py
  11. 26
      src/python/src/grpc/framework/base/interfaces.py
  12. 24
      src/python/src/grpc/framework/base/interfaces_test_case.py

@ -59,11 +59,11 @@ class RoundTripTest(unittest.TestCase):
test_fore_link = _test_links.ForeLink(None, None)
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.kind in (
interfaces.FrontToBackPacket.Kind.COMPLETION,
interfaces.FrontToBackPacket.Kind.ENTIRE):
back_to_front_ticket = interfaces.BackToFrontPacket(
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE):
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, 0,
interfaces.BackToFrontPacket.Kind.COMPLETION, None)
interfaces.BackToFrontTicket.Kind.COMPLETION, None)
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
@ -81,8 +81,8 @@ class RoundTripTest(unittest.TestCase):
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackPacket(
test_operation_id, 0, interfaces.FrontToBackPacket.Kind.ENTIRE,
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
@ -90,7 +90,7 @@ class RoundTripTest(unittest.TestCase):
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is
interfaces.BackToFrontPacket.Kind.CONTINUATION):
interfaces.BackToFrontTicket.Kind.CONTINUATION):
test_fore_link.condition.wait()
rear_link.stop()
@ -99,7 +99,7 @@ class RoundTripTest(unittest.TestCase):
with test_fore_link.condition:
self.assertIs(
test_fore_link.tickets[-1].kind,
interfaces.BackToFrontPacket.Kind.COMPLETION)
interfaces.BackToFrontTicket.Kind.COMPLETION)
def testEntireRoundTrip(self):
test_operation_id = object()
@ -114,14 +114,14 @@ class RoundTripTest(unittest.TestCase):
else:
payload = test_back_to_front_datum
terminal = front_to_back_ticket.kind in (
interfaces.FrontToBackPacket.Kind.COMPLETION,
interfaces.FrontToBackPacket.Kind.ENTIRE)
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE)
if payload is not None or terminal:
if terminal:
kind = interfaces.BackToFrontPacket.Kind.COMPLETION
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontPacket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontPacket(
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
payload)
rear_sequence_number[0] += 1
@ -143,8 +143,8 @@ class RoundTripTest(unittest.TestCase):
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackPacket(
test_operation_id, 0, interfaces.FrontToBackPacket.Kind.ENTIRE,
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
@ -152,7 +152,7 @@ class RoundTripTest(unittest.TestCase):
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
interfaces.BackToFrontPacket.Kind.COMPLETION):
interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
@ -182,14 +182,14 @@ class RoundTripTest(unittest.TestCase):
else:
response = None
terminal = front_to_back_ticket.kind in (
interfaces.FrontToBackPacket.Kind.COMPLETION,
interfaces.FrontToBackPacket.Kind.ENTIRE)
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE)
if response is not None or terminal:
if terminal:
kind = interfaces.BackToFrontPacket.Kind.COMPLETION
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontPacket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontPacket(
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
response)
rear_sequence_number[0] += 1
@ -212,23 +212,23 @@ class RoundTripTest(unittest.TestCase):
test_fore_link.join_rear_link(rear_link)
rear_link.start()
commencement_ticket = interfaces.FrontToBackPacket(
commencement_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0,
interfaces.FrontToBackPacket.Kind.COMMENCEMENT, test_method,
interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
continuation_ticket = interfaces.FrontToBackPacket(
continuation_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
interfaces.FrontToBackPacket.Kind.CONTINUATION, None, None, None,
interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None,
request, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(continuation_ticket)
completion_ticket = interfaces.FrontToBackPacket(
completion_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
interfaces.FrontToBackPacket.Kind.COMPLETION, None, None, None, None,
interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None,
None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(completion_ticket)
@ -236,7 +236,7 @@ class RoundTripTest(unittest.TestCase):
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
interfaces.BackToFrontPacket.Kind.COMPLETION):
interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()

@ -67,7 +67,7 @@ class LonelyRearLinkTest(unittest.TestCase):
rear_link.join_fore_link(fore_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackPacket(
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, front_to_back_ticket_kind, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
@ -76,7 +76,7 @@ class LonelyRearLinkTest(unittest.TestCase):
while True:
if (fore_link.tickets and
fore_link.tickets[-1].kind is not
interfaces.BackToFrontPacket.Kind.CONTINUATION):
interfaces.BackToFrontTicket.Kind.CONTINUATION):
break
fore_link.condition.wait()
@ -85,15 +85,15 @@ class LonelyRearLinkTest(unittest.TestCase):
with fore_link.condition:
self.assertIsNot(
fore_link.tickets[-1].kind,
interfaces.BackToFrontPacket.Kind.COMPLETION)
interfaces.BackToFrontTicket.Kind.COMPLETION)
def testLonelyClientCommencementPacket(self):
def testLonelyClientCommencementTicket(self):
self._perform_lonely_client_test_with_ticket_kind(
interfaces.FrontToBackPacket.Kind.COMMENCEMENT)
interfaces.FrontToBackTicket.Kind.COMMENCEMENT)
def testLonelyClientEntirePacket(self):
def testLonelyClientEntireTicket(self):
self._perform_lonely_client_test_with_ticket_kind(
interfaces.FrontToBackPacket.Kind.ENTIRE)
interfaces.FrontToBackTicket.Kind.ENTIRE)
if __name__ == '__main__':

@ -125,8 +125,8 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
self._request_deserializers[method],
self._response_serializers[method])
ticket = base_interfaces.FrontToBackPacket(
call, 0, base_interfaces.FrontToBackPacket.Kind.COMMENCEMENT, method,
ticket = base_interfaces.FrontToBackTicket(
call, 0, base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT, method,
base_interfaces.ServicedSubscription.Kind.FULL, None, None,
service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket)
@ -143,15 +143,15 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
if event.bytes is None:
ticket = base_interfaces.FrontToBackPacket(
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackPacket.Kind.COMPLETION, None, None, None,
base_interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None,
None, None)
else:
call.read(call)
ticket = base_interfaces.FrontToBackPacket(
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackPacket.Kind.CONTINUATION, None, None,
base_interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None,
None, rpc_state.deserializer(event.bytes), None)
self._rear_link.accept_front_to_back_ticket(ticket)
@ -180,9 +180,9 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
ticket = base_interfaces.FrontToBackPacket(
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None,
base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None,
None, None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)
@ -200,20 +200,20 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
if code is _low.Code.CANCELLED:
ticket = base_interfaces.FrontToBackPacket(
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackPacket.Kind.CANCELLATION, None, None,
base_interfaces.FrontToBackTicket.Kind.CANCELLATION, None, None,
None, None, None)
elif code is _low.Code.EXPIRED:
ticket = base_interfaces.FrontToBackPacket(
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackPacket.Kind.EXPIRATION, None, None, None,
base_interfaces.FrontToBackTicket.Kind.EXPIRATION, None, None, None,
None, None)
else:
# TODO(nathaniel): Better mapping of codes to ticket-categories
ticket = base_interfaces.FrontToBackPacket(
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None,
base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None,
None, None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)
@ -355,9 +355,9 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
if self._server is None:
return
if ticket.kind is base_interfaces.BackToFrontPacket.Kind.CONTINUATION:
if ticket.kind is base_interfaces.BackToFrontTicket.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
elif ticket.kind is base_interfaces.BackToFrontPacket.Kind.COMPLETION:
elif ticket.kind is base_interfaces.BackToFrontTicket.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)

@ -151,9 +151,9 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
else:
logging.error('RPC write not accepted! Event: %s', (event,))
rpc_state.active = False
ticket = base_interfaces.BackToFrontPacket(
ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number,
base_interfaces.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None)
base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@ -162,9 +162,9 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
rpc_state.call.read(operation_id)
rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED)
ticket = base_interfaces.BackToFrontPacket(
ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number,
base_interfaces.BackToFrontPacket.Kind.CONTINUATION,
base_interfaces.BackToFrontTicket.Kind.CONTINUATION,
rpc_state.common.deserializer(event.bytes))
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@ -173,9 +173,9 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
if not event.complete_accepted:
logging.error('RPC complete not accepted! Event: %s', (event,))
rpc_state.active = False
ticket = base_interfaces.BackToFrontPacket(
ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number,
base_interfaces.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None)
base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@ -188,14 +188,14 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
"""Handle termination of an RPC."""
# TODO(nathaniel): Cover all statuses.
if event.status.code is _low.Code.OK:
kind = base_interfaces.BackToFrontPacket.Kind.COMPLETION
kind = base_interfaces.BackToFrontTicket.Kind.COMPLETION
elif event.status.code is _low.Code.CANCELLED:
kind = base_interfaces.BackToFrontPacket.Kind.CANCELLATION
kind = base_interfaces.BackToFrontTicket.Kind.CANCELLATION
elif event.status.code is _low.Code.EXPIRED:
kind = base_interfaces.BackToFrontPacket.Kind.EXPIRATION
kind = base_interfaces.BackToFrontTicket.Kind.EXPIRATION
else:
kind = base_interfaces.BackToFrontPacket.Kind.TRANSMISSION_FAILURE
ticket = base_interfaces.BackToFrontPacket(
kind = base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE
ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number, kind, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@ -370,17 +370,17 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
if self._completion_queue is None:
return
if ticket.kind is base_interfaces.FrontToBackPacket.Kind.COMMENCEMENT:
if ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT:
self._commence(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
elif ticket.kind is base_interfaces.FrontToBackPacket.Kind.CONTINUATION:
elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
elif ticket.kind is base_interfaces.FrontToBackPacket.Kind.COMPLETION:
elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
elif ticket.kind is base_interfaces.FrontToBackPacket.Kind.ENTIRE:
elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.ENTIRE:
self._entire(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
elif ticket.kind is base_interfaces.FrontToBackPacket.Kind.CANCELLATION:
elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CANCELLATION:
self._cancel(ticket.operation_id)
else:
# NOTE(nathaniel): All other categories are treated as cancellation.

@ -150,7 +150,7 @@ def _front_operate(
"""Constructs objects necessary for front-side operation management.
Args:
callback: A callable that accepts interfaces.FrontToBackPackets and
callback: A callable that accepts interfaces.FrontToBackTickets and
delivers them to the other side of the operation. Execution of this
callable may take any arbitrary length of time.
work_pool: A thread pool in which to execute customer code.
@ -276,7 +276,7 @@ class FrontLink(interfaces.FrontLink):
with self._endlette:
reception_manager = self._endlette.get_operation(ticket.operation_id)
if reception_manager:
reception_manager.receive_packet(ticket)
reception_manager.receive_ticket(ticket)
def _back_operate(
@ -289,7 +289,7 @@ def _back_operate(
Args:
servicer: An interfaces.Servicer for servicing operations.
callback: A callable that accepts interfaces.BackToFrontPackets and
callback: A callable that accepts interfaces.BackToFrontTickets and
delivers them to the other side of the operation. Execution of this
callable may take any arbitrary length of time.
work_pool: A thread pool in which to execute customer code.
@ -298,7 +298,7 @@ def _back_operate(
utility_pool: A thread pool for utility tasks.
termination_action: A no-arg behavior to be called upon operation
completion.
ticket: The first interfaces.FrontToBackPacket received for the operation.
ticket: The first interfaces.FrontToBackTicket received for the operation.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
@ -338,7 +338,7 @@ def _back_operate(
ingestion_manager, expiration_manager)
ingestion_manager.set_expiration_manager(expiration_manager)
reception_manager.receive_packet(ticket)
reception_manager.receive_ticket(ticket)
return reception_manager
@ -388,7 +388,7 @@ class BackLink(interfaces.BackLink):
self._default_timeout, self._maximum_timeout)
self._endlette.add_operation(ticket.operation_id, reception_manager)
else:
reception_manager.receive_packet(ticket)
reception_manager.receive_ticket(ticket)
def operation_stats(self):
"""See interfaces.End.operation_stats for specification."""

@ -247,15 +247,15 @@ class ExpirationManager(object):
class ReceptionManager(object):
"""A manager responsible for receiving packets from the other end."""
"""A manager responsible for receiving tickets from the other end."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def receive_packet(self, packet):
"""Handle a packet from the other side of the operation.
def receive_ticket(self, ticket):
"""Handle a ticket from the other side of the operation.
Args:
packet: An interfaces.BackToFrontPacket or interfaces.FrontToBackPacket
ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
appropriate to this end of the operation and this object.
"""
raise NotImplementedError()

@ -27,46 +27,46 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for packet reception."""
"""State and behavior for ticket reception."""
import abc
from grpc.framework.base import interfaces
from grpc.framework.base import _interfaces
_INITIAL_FRONT_TO_BACK_PACKET_KINDS = (
interfaces.FrontToBackPacket.Kind.COMMENCEMENT,
interfaces.FrontToBackPacket.Kind.ENTIRE,
_INITIAL_FRONT_TO_BACK_TICKET_KINDS = (
interfaces.FrontToBackTicket.Kind.COMMENCEMENT,
interfaces.FrontToBackTicket.Kind.ENTIRE,
)
class _Receiver(object):
"""Common specification of different packet-handling behavior."""
"""Common specification of different ticket-handling behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def abort_if_abortive(self, packet):
"""Aborts the operation if the packet is abortive.
def abort_if_abortive(self, ticket):
"""Aborts the operation if the ticket is abortive.
Args:
packet: A just-arrived packet.
ticket: A just-arrived ticket.
Returns:
A boolean indicating whether or not this Receiver aborted the operation
based on the packet.
based on the ticket.
"""
raise NotImplementedError()
@abc.abstractmethod
def receive(self, packet):
"""Handles a just-arrived packet.
def receive(self, ticket):
"""Handles a just-arrived ticket.
Args:
packet: A just-arrived packet.
ticket: A just-arrived ticket.
Returns:
A boolean indicating whether or not the packet was terminal (i.e. whether
or not non-abortive packets are legal after this one).
A boolean indicating whether or not the ticket was terminal (i.e. whether
or not non-abortive tickets are legal after this one).
"""
raise NotImplementedError()
@ -87,13 +87,13 @@ def _abort(
def _abort_if_abortive(
packet, abortive, termination_manager, transmission_manager,
ticket, abortive, termination_manager, transmission_manager,
ingestion_manager, expiration_manager):
"""Determines a packet's being abortive and if so aborts the operation.
"""Determines a ticket's being abortive and if so aborts the operation.
Args:
packet: A just-arrived packet.
abortive: A callable that takes a packet and returns an interfaces.Outcome
ticket: A just-arrived ticket.
abortive: A callable that takes a ticket and returns an interfaces.Outcome
indicating that the operation should be aborted or None indicating that
the operation should not be aborted.
termination_manager: The operation's _interfaces.TerminationManager.
@ -104,7 +104,7 @@ def _abort_if_abortive(
Returns:
True if the operation was aborted; False otherwise.
"""
abortion_outcome = abortive(packet)
abortion_outcome = abortive(ticket)
if abortion_outcome is None:
return False
else:
@ -124,7 +124,7 @@ def _reception_failure(
class _BackReceiver(_Receiver):
"""Packet-handling specific to the back side of an operation."""
"""Ticket-handling specific to the back side of an operation."""
def __init__(
self, termination_manager, transmission_manager, ingestion_manager,
@ -142,68 +142,68 @@ class _BackReceiver(_Receiver):
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
self._first_packet_seen = False
self._last_packet_seen = False
self._first_ticket_seen = False
self._last_ticket_seen = False
def _abortive(self, packet):
"""Determines whether or not (and if so, how) a packet is abortive.
def _abortive(self, ticket):
"""Determines whether or not (and if so, how) a ticket is abortive.
Args:
packet: A just-arrived packet.
ticket: A just-arrived ticket.
Returns:
An interfaces.Outcome value describing operation abortion if the
packet is abortive or None if the packet is not abortive.
ticket is abortive or None if the ticket is not abortive.
"""
if packet.kind is interfaces.FrontToBackPacket.Kind.CANCELLATION:
if ticket.kind is interfaces.FrontToBackTicket.Kind.CANCELLATION:
return interfaces.Outcome.CANCELLED
elif packet.kind is interfaces.FrontToBackPacket.Kind.EXPIRATION:
elif ticket.kind is interfaces.FrontToBackTicket.Kind.EXPIRATION:
return interfaces.Outcome.EXPIRED
elif packet.kind is interfaces.FrontToBackPacket.Kind.SERVICED_FAILURE:
elif ticket.kind is interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE:
return interfaces.Outcome.SERVICED_FAILURE
elif packet.kind is interfaces.FrontToBackPacket.Kind.RECEPTION_FAILURE:
elif ticket.kind is interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE:
return interfaces.Outcome.SERVICED_FAILURE
elif (packet.kind in _INITIAL_FRONT_TO_BACK_PACKET_KINDS and
self._first_packet_seen):
elif (ticket.kind in _INITIAL_FRONT_TO_BACK_TICKET_KINDS and
self._first_ticket_seen):
return interfaces.Outcome.RECEPTION_FAILURE
elif self._last_packet_seen:
elif self._last_ticket_seen:
return interfaces.Outcome.RECEPTION_FAILURE
else:
return None
def abort_if_abortive(self, packet):
def abort_if_abortive(self, ticket):
"""See _Receiver.abort_if_abortive for specification."""
return _abort_if_abortive(
packet, self._abortive, self._termination_manager,
ticket, self._abortive, self._termination_manager,
self._transmission_manager, self._ingestion_manager,
self._expiration_manager)
def receive(self, packet):
def receive(self, ticket):
"""See _Receiver.receive for specification."""
if packet.timeout is not None:
self._expiration_manager.change_timeout(packet.timeout)
if packet.kind is interfaces.FrontToBackPacket.Kind.COMMENCEMENT:
self._first_packet_seen = True
self._ingestion_manager.start(packet.name)
if packet.payload is not None:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is interfaces.FrontToBackPacket.Kind.CONTINUATION:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is interfaces.FrontToBackPacket.Kind.COMPLETION:
self._last_packet_seen = True
if packet.payload is None:
if ticket.timeout is not None:
self._expiration_manager.change_timeout(ticket.timeout)
if ticket.kind is interfaces.FrontToBackTicket.Kind.COMMENCEMENT:
self._first_ticket_seen = True
self._ingestion_manager.start(ticket.name)
if ticket.payload is not None:
self._ingestion_manager.consume(ticket.payload)
elif ticket.kind is interfaces.FrontToBackTicket.Kind.CONTINUATION:
self._ingestion_manager.consume(ticket.payload)
elif ticket.kind is interfaces.FrontToBackTicket.Kind.COMPLETION:
self._last_ticket_seen = True
if ticket.payload is None:
self._ingestion_manager.terminate()
else:
self._ingestion_manager.consume_and_terminate(packet.payload)
self._ingestion_manager.consume_and_terminate(ticket.payload)
else:
self._first_packet_seen = True
self._last_packet_seen = True
self._ingestion_manager.start(packet.name)
if packet.payload is None:
self._first_ticket_seen = True
self._last_ticket_seen = True
self._ingestion_manager.start(ticket.name)
if ticket.payload is None:
self._ingestion_manager.terminate()
else:
self._ingestion_manager.consume_and_terminate(packet.payload)
self._ingestion_manager.consume_and_terminate(ticket.payload)
def reception_failure(self):
"""See _Receiver.reception_failure for specification."""
@ -213,7 +213,7 @@ class _BackReceiver(_Receiver):
class _FrontReceiver(_Receiver):
"""Packet-handling specific to the front side of an operation."""
"""Ticket-handling specific to the front side of an operation."""
def __init__(
self, termination_manager, transmission_manager, ingestion_manager,
@ -231,48 +231,48 @@ class _FrontReceiver(_Receiver):
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
self._last_packet_seen = False
self._last_ticket_seen = False
def _abortive(self, packet):
"""Determines whether or not (and if so, how) a packet is abortive.
def _abortive(self, ticket):
"""Determines whether or not (and if so, how) a ticket is abortive.
Args:
packet: A just-arrived packet.
ticket: A just-arrived ticket.
Returns:
An interfaces.Outcome value describing operation abortion if the packet
is abortive or None if the packet is not abortive.
An interfaces.Outcome value describing operation abortion if the ticket
is abortive or None if the ticket is not abortive.
"""
if packet.kind is interfaces.BackToFrontPacket.Kind.CANCELLATION:
if ticket.kind is interfaces.BackToFrontTicket.Kind.CANCELLATION:
return interfaces.Outcome.CANCELLED
elif packet.kind is interfaces.BackToFrontPacket.Kind.EXPIRATION:
elif ticket.kind is interfaces.BackToFrontTicket.Kind.EXPIRATION:
return interfaces.Outcome.EXPIRED
elif packet.kind is interfaces.BackToFrontPacket.Kind.SERVICER_FAILURE:
elif ticket.kind is interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE:
return interfaces.Outcome.SERVICER_FAILURE
elif packet.kind is interfaces.BackToFrontPacket.Kind.RECEPTION_FAILURE:
elif ticket.kind is interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE:
return interfaces.Outcome.SERVICER_FAILURE
elif self._last_packet_seen:
elif self._last_ticket_seen:
return interfaces.Outcome.RECEPTION_FAILURE
else:
return None
def abort_if_abortive(self, packet):
def abort_if_abortive(self, ticket):
"""See _Receiver.abort_if_abortive for specification."""
return _abort_if_abortive(
packet, self._abortive, self._termination_manager,
ticket, self._abortive, self._termination_manager,
self._transmission_manager, self._ingestion_manager,
self._expiration_manager)
def receive(self, packet):
def receive(self, ticket):
"""See _Receiver.receive for specification."""
if packet.kind is interfaces.BackToFrontPacket.Kind.CONTINUATION:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is interfaces.BackToFrontPacket.Kind.COMPLETION:
self._last_packet_seen = True
if packet.payload is None:
if ticket.kind is interfaces.BackToFrontTicket.Kind.CONTINUATION:
self._ingestion_manager.consume(ticket.payload)
elif ticket.kind is interfaces.BackToFrontTicket.Kind.COMPLETION:
self._last_ticket_seen = True
if ticket.payload is None:
self._ingestion_manager.terminate()
else:
self._ingestion_manager.consume_and_terminate(packet.payload)
self._ingestion_manager.consume_and_terminate(ticket.payload)
def reception_failure(self):
"""See _Receiver.reception_failure for specification."""
@ -289,72 +289,72 @@ class _ReceptionManager(_interfaces.ReceptionManager):
Args:
lock: The operation-servicing-wide lock object.
receiver: A _Receiver responsible for handling received packets.
receiver: A _Receiver responsible for handling received tickets.
"""
self._lock = lock
self._receiver = receiver
self._lowest_unseen_sequence_number = 0
self._out_of_sequence_packets = {}
self._out_of_sequence_tickets = {}
self._completed_sequence_number = None
self._aborted = False
def _sequence_failure(self, packet):
"""Determines a just-arrived packet's sequential legitimacy.
def _sequence_failure(self, ticket):
"""Determines a just-arrived ticket's sequential legitimacy.
Args:
packet: A just-arrived packet.
ticket: A just-arrived ticket.
Returns:
True if the packet is sequentially legitimate; False otherwise.
True if the ticket is sequentially legitimate; False otherwise.
"""
if packet.sequence_number < self._lowest_unseen_sequence_number:
if ticket.sequence_number < self._lowest_unseen_sequence_number:
return True
elif packet.sequence_number in self._out_of_sequence_packets:
elif ticket.sequence_number in self._out_of_sequence_tickets:
return True
elif (self._completed_sequence_number is not None and
self._completed_sequence_number <= packet.sequence_number):
self._completed_sequence_number <= ticket.sequence_number):
return True
else:
return False
def _process(self, packet):
"""Process those packets ready to be processed.
def _process(self, ticket):
"""Process those tickets ready to be processed.
Args:
packet: A just-arrived packet the sequence number of which matches this
ticket: A just-arrived ticket the sequence number of which matches this
_ReceptionManager's _lowest_unseen_sequence_number field.
"""
while True:
completed = self._receiver.receive(packet)
completed = self._receiver.receive(ticket)
if completed:
self._out_of_sequence_packets.clear()
self._completed_sequence_number = packet.sequence_number
self._lowest_unseen_sequence_number = packet.sequence_number + 1
self._out_of_sequence_tickets.clear()
self._completed_sequence_number = ticket.sequence_number
self._lowest_unseen_sequence_number = ticket.sequence_number + 1
return
else:
next_packet = self._out_of_sequence_packets.pop(
packet.sequence_number + 1, None)
if next_packet is None:
self._lowest_unseen_sequence_number = packet.sequence_number + 1
next_ticket = self._out_of_sequence_tickets.pop(
ticket.sequence_number + 1, None)
if next_ticket is None:
self._lowest_unseen_sequence_number = ticket.sequence_number + 1
return
else:
packet = next_packet
ticket = next_ticket
def receive_packet(self, packet):
"""See _interfaces.ReceptionManager.receive_packet for specification."""
def receive_ticket(self, ticket):
"""See _interfaces.ReceptionManager.receive_ticket for specification."""
with self._lock:
if self._aborted:
return
elif self._sequence_failure(packet):
elif self._sequence_failure(ticket):
self._receiver.reception_failure()
self._aborted = True
elif self._receiver.abort_if_abortive(packet):
elif self._receiver.abort_if_abortive(ticket):
self._aborted = True
elif packet.sequence_number == self._lowest_unseen_sequence_number:
self._process(packet)
elif ticket.sequence_number == self._lowest_unseen_sequence_number:
self._process(ticket)
else:
self._out_of_sequence_packets[packet.sequence_number] = packet
self._out_of_sequence_tickets[ticket.sequence_number] = ticket
def front_reception_manager(

@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for packet transmission during an operation."""
"""State and behavior for ticket transmission during an operation."""
import abc
@ -46,53 +46,53 @@ _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = (
interfaces.Outcome.SERVICED_FAILURE,
)
_ABORTION_OUTCOME_TO_FRONT_TO_BACK_PACKET_KIND = {
_ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = {
interfaces.Outcome.CANCELLED:
interfaces.FrontToBackPacket.Kind.CANCELLATION,
interfaces.FrontToBackTicket.Kind.CANCELLATION,
interfaces.Outcome.EXPIRED:
interfaces.FrontToBackPacket.Kind.EXPIRATION,
interfaces.FrontToBackTicket.Kind.EXPIRATION,
interfaces.Outcome.RECEPTION_FAILURE:
interfaces.FrontToBackPacket.Kind.RECEPTION_FAILURE,
interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE,
interfaces.Outcome.TRANSMISSION_FAILURE:
interfaces.FrontToBackPacket.Kind.TRANSMISSION_FAILURE,
interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE,
interfaces.Outcome.SERVICED_FAILURE:
interfaces.FrontToBackPacket.Kind.SERVICED_FAILURE,
interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE,
interfaces.Outcome.SERVICER_FAILURE:
interfaces.FrontToBackPacket.Kind.SERVICER_FAILURE,
interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE,
}
_ABORTION_OUTCOME_TO_BACK_TO_FRONT_PACKET_KIND = {
_ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = {
interfaces.Outcome.CANCELLED:
interfaces.BackToFrontPacket.Kind.CANCELLATION,
interfaces.BackToFrontTicket.Kind.CANCELLATION,
interfaces.Outcome.EXPIRED:
interfaces.BackToFrontPacket.Kind.EXPIRATION,
interfaces.BackToFrontTicket.Kind.EXPIRATION,
interfaces.Outcome.RECEPTION_FAILURE:
interfaces.BackToFrontPacket.Kind.RECEPTION_FAILURE,
interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE,
interfaces.Outcome.TRANSMISSION_FAILURE:
interfaces.BackToFrontPacket.Kind.TRANSMISSION_FAILURE,
interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE,
interfaces.Outcome.SERVICED_FAILURE:
interfaces.BackToFrontPacket.Kind.SERVICED_FAILURE,
interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE,
interfaces.Outcome.SERVICER_FAILURE:
interfaces.BackToFrontPacket.Kind.SERVICER_FAILURE,
interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE,
}
class _Packetizer(object):
"""Common specification of different packet-creating behavior."""
class _Ticketizer(object):
"""Common specification of different ticket-creating behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def packetize(self, operation_id, sequence_number, payload, complete):
"""Creates a packet indicating ordinary operation progress.
def ticketize(self, operation_id, sequence_number, payload, complete):
"""Creates a ticket indicating ordinary operation progress.
Args:
operation_id: The operation ID for the current operation.
sequence_number: A sequence number for the packet.
sequence_number: A sequence number for the ticket.
payload: A customer payload object. May be None if sequence_number is
zero or complete is true.
complete: A boolean indicating whether or not the packet should describe
complete: A boolean indicating whether or not the ticket should describe
itself as (but for a later indication of operation abortion) the last
packet to be sent.
ticket to be sent.
Returns:
An object of an appropriate type suitable for transmission to the other
@ -101,12 +101,12 @@ class _Packetizer(object):
raise NotImplementedError()
@abc.abstractmethod
def packetize_abortion(self, operation_id, sequence_number, outcome):
"""Creates a packet indicating that the operation is aborted.
def ticketize_abortion(self, operation_id, sequence_number, outcome):
"""Creates a ticket indicating that the operation is aborted.
Args:
operation_id: The operation ID for the current operation.
sequence_number: A sequence number for the packet.
sequence_number: A sequence number for the ticket.
outcome: An interfaces.Outcome value describing the operation abortion.
Returns:
@ -117,8 +117,8 @@ class _Packetizer(object):
raise NotImplementedError()
class _FrontPacketizer(_Packetizer):
"""Front-side packet-creating behavior."""
class _FrontTicketizer(_Ticketizer):
"""Front-side ticket-creating behavior."""
def __init__(self, name, subscription_kind, trace_id, timeout):
"""Constructor.
@ -126,7 +126,7 @@ class _FrontPacketizer(_Packetizer):
Args:
name: The name of the operation.
subscription_kind: An interfaces.ServicedSubscription.Kind value
describing the interest the front has in packets sent from the back.
describing the interest the front has in tickets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
@ -136,54 +136,54 @@ class _FrontPacketizer(_Packetizer):
self._trace_id = trace_id
self._timeout = timeout
def packetize(self, operation_id, sequence_number, payload, complete):
"""See _Packetizer.packetize for specification."""
def ticketize(self, operation_id, sequence_number, payload, complete):
"""See _Ticketizer.ticketize for specification."""
if sequence_number:
if complete:
kind = interfaces.FrontToBackPacket.Kind.COMPLETION
kind = interfaces.FrontToBackTicket.Kind.COMPLETION
else:
kind = interfaces.FrontToBackPacket.Kind.CONTINUATION
return interfaces.FrontToBackPacket(
kind = interfaces.FrontToBackTicket.Kind.CONTINUATION
return interfaces.FrontToBackTicket(
operation_id, sequence_number, kind, self._name,
self._subscription_kind, self._trace_id, payload, self._timeout)
else:
if complete:
kind = interfaces.FrontToBackPacket.Kind.ENTIRE
kind = interfaces.FrontToBackTicket.Kind.ENTIRE
else:
kind = interfaces.FrontToBackPacket.Kind.COMMENCEMENT
return interfaces.FrontToBackPacket(
kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT
return interfaces.FrontToBackTicket(
operation_id, 0, kind, self._name, self._subscription_kind,
self._trace_id, payload, self._timeout)
def packetize_abortion(self, operation_id, sequence_number, outcome):
"""See _Packetizer.packetize_abortion for specification."""
def ticketize_abortion(self, operation_id, sequence_number, outcome):
"""See _Ticketizer.ticketize_abortion for specification."""
if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES:
return None
else:
kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_PACKET_KIND[outcome]
return interfaces.FrontToBackPacket(
kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome]
return interfaces.FrontToBackTicket(
operation_id, sequence_number, kind, None, None, None, None, None)
class _BackPacketizer(_Packetizer):
"""Back-side packet-creating behavior."""
class _BackTicketizer(_Ticketizer):
"""Back-side ticket-creating behavior."""
def packetize(self, operation_id, sequence_number, payload, complete):
"""See _Packetizer.packetize for specification."""
def ticketize(self, operation_id, sequence_number, payload, complete):
"""See _Ticketizer.ticketize for specification."""
if complete:
kind = interfaces.BackToFrontPacket.Kind.COMPLETION
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontPacket.Kind.CONTINUATION
return interfaces.BackToFrontPacket(
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
return interfaces.BackToFrontTicket(
operation_id, sequence_number, kind, payload)
def packetize_abortion(self, operation_id, sequence_number, outcome):
"""See _Packetizer.packetize_abortion for specification."""
def ticketize_abortion(self, operation_id, sequence_number, outcome):
"""See _Ticketizer.ticketize_abortion for specification."""
if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES:
return None
else:
kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_PACKET_KIND[outcome]
return interfaces.BackToFrontPacket(
kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome]
return interfaces.BackToFrontTicket(
operation_id, sequence_number, kind, None)
@ -220,21 +220,21 @@ class _EmptyTransmissionManager(TransmissionManager):
class _TransmittingTransmissionManager(TransmissionManager):
"""A TransmissionManager implementation that sends packets."""
"""A TransmissionManager implementation that sends tickets."""
def __init__(
self, lock, pool, callback, operation_id, packetizer,
self, lock, pool, callback, operation_id, ticketizer,
termination_manager):
"""Constructor.
Args:
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting packets will be
pool: A thread pool in which the work of transmitting tickets will be
performed.
callback: A callable that accepts packets and sends them to the other side
callback: A callable that accepts tickets and sends them to the other side
of the operation.
operation_id: The operation's ID.
packetizer: A _Packetizer for packet creation.
ticketizer: A _Ticketizer for ticket creation.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
"""
@ -242,7 +242,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
self._pool = pool
self._callback = callback
self._operation_id = operation_id
self._packetizer = packetizer
self._ticketizer = ticketizer
self._termination_manager = termination_manager
self._ingestion_manager = None
self._expiration_manager = None
@ -259,8 +259,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
def _lead_packet(self, emission, complete):
"""Creates a packet suitable for leading off the transmission loop.
def _lead_ticket(self, emission, complete):
"""Creates a ticket suitable for leading off the transmission loop.
Args:
emission: A customer payload object to be sent to the other side of the
@ -269,37 +269,37 @@ class _TransmittingTransmissionManager(TransmissionManager):
the passed object.
Returns:
A packet with which to lead off the transmission loop.
A ticket with which to lead off the transmission loop.
"""
sequence_number = self._lowest_unused_sequence_number
self._lowest_unused_sequence_number += 1
return self._packetizer.packetize(
return self._ticketizer.ticketize(
self._operation_id, sequence_number, emission, complete)
def _abortive_response_packet(self, outcome):
"""Creates a packet indicating operation abortion.
def _abortive_response_ticket(self, outcome):
"""Creates a ticket indicating operation abortion.
Args:
outcome: An interfaces.Outcome value describing operation abortion.
Returns:
A packet indicating operation abortion.
A ticket indicating operation abortion.
"""
packet = self._packetizer.packetize_abortion(
ticket = self._ticketizer.ticketize_abortion(
self._operation_id, self._lowest_unused_sequence_number, outcome)
if packet is None:
if ticket is None:
return None
else:
self._lowest_unused_sequence_number += 1
return packet
return ticket
def _next_packet(self):
"""Creates the next packet to be sent to the other side of the operation.
def _next_ticket(self):
"""Creates the next ticket to be sent to the other side of the operation.
Returns:
A (completed, packet) tuple comprised of a boolean indicating whether or
not the sequence of packets has completed normally and a packet to send
to the other side if the sequence of packets hasn't completed. The tuple
A (completed, ticket) tuple comprised of a boolean indicating whether or
not the sequence of tickets has completed normally and a ticket to send
to the other side if the sequence of tickets hasn't completed. The tuple
will never have both a True first element and a non-None second element.
"""
if self._emissions is None:
@ -310,29 +310,29 @@ class _TransmittingTransmissionManager(TransmissionManager):
complete = self._emission_complete and not self._emissions
sequence_number = self._lowest_unused_sequence_number
self._lowest_unused_sequence_number += 1
return complete, self._packetizer.packetize(
return complete, self._ticketizer.ticketize(
self._operation_id, sequence_number, payload, complete)
else:
return self._emission_complete, None
else:
packet = self._abortive_response_packet(self._outcome)
ticket = self._abortive_response_ticket(self._outcome)
self._emissions = None
return False, None if packet is None else packet
return False, None if ticket is None else ticket
def _transmit(self, packet):
"""Commences the transmission loop sending packets.
def _transmit(self, ticket):
"""Commences the transmission loop sending tickets.
Args:
packet: A packet to be sent to the other side of the operation.
ticket: A ticket to be sent to the other side of the operation.
"""
def transmit(packet):
def transmit(ticket):
while True:
transmission_outcome = callable_util.call_logging_exceptions(
self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet)
self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
if transmission_outcome.exception is None:
with self._lock:
complete, packet = self._next_packet()
if packet is None:
complete, ticket = self._next_ticket()
if ticket is None:
if complete:
self._termination_manager.transmission_complete()
self._transmitting = False
@ -348,7 +348,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
return
self._pool.submit(callable_util.with_exceptions_logged(
transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet)
transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
self._transmitting = True
def inmit(self, emission, complete):
@ -358,17 +358,17 @@ class _TransmittingTransmissionManager(TransmissionManager):
if self._transmitting:
self._emissions.append(emission)
else:
self._transmit(self._lead_packet(emission, complete))
self._transmit(self._lead_ticket(emission, complete))
def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification."""
if self._emissions is not None and self._outcome is None:
self._outcome = outcome
if not self._transmitting:
packet = self._abortive_response_packet(outcome)
ticket = self._abortive_response_ticket(outcome)
self._emissions = None
if packet is not None:
self._transmit(packet)
if ticket is not None:
self._transmit(ticket)
def front_transmission_manager(
@ -378,14 +378,14 @@ def front_transmission_manager(
Args:
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting packets will be
pool: A thread pool in which the work of transmitting tickets will be
performed.
callback: A callable that accepts packets and sends them to the other side
callback: A callable that accepts tickets and sends them to the other side
of the operation.
operation_id: The operation's ID.
name: The name of the operation.
subscription_kind: An interfaces.ServicedSubscription.Kind value
describing the interest the front has in packets sent from the back.
describing the interest the front has in tickets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
@ -396,7 +396,7 @@ def front_transmission_manager(
A TransmissionManager appropriate for front-side use.
"""
return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _FrontPacketizer(
lock, pool, callback, operation_id, _FrontTicketizer(
name, subscription_kind, trace_id, timeout),
termination_manager)
@ -408,15 +408,15 @@ def back_transmission_manager(
Args:
lock: The operation-servicing-wide lock object.
pool: A thread pool in which the work of transmitting packets will be
pool: A thread pool in which the work of transmitting tickets will be
performed.
callback: A callable that accepts packets and sends them to the other side
callback: A callable that accepts tickets and sends them to the other side
of the operation.
operation_id: The operation's ID.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
subscription_kind: An interfaces.ServicedSubscription.Kind value
describing the interest the front has in packets sent from the back.
describing the interest the front has in tickets sent from the back.
Returns:
A TransmissionManager appropriate for back-side use.
@ -425,5 +425,5 @@ def back_transmission_manager(
return _EmptyTransmissionManager()
else:
return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _BackPacketizer(),
lock, pool, callback, operation_id, _BackTicketizer(),
termination_manager)

@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the packet-exchange-based implementation the base layer."""
"""Entry points into the ticket-exchange-based base layer implementation."""
# interfaces is referenced from specification in this module.
from grpc.framework.base import _ends

@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the packet-exchange-based implementation the base layer."""
"""In-memory implementations of base layer interfaces."""
import threading

@ -233,9 +233,9 @@ class Back(End):
__metaclass__ = abc.ABCMeta
class FrontToBackPacket(
class FrontToBackTicket(
collections.namedtuple(
'FrontToBackPacket',
'FrontToBackTicket',
['operation_id', 'sequence_number', 'kind', 'name', 'subscription',
'trace_id', 'payload', 'timeout'])):
"""A sum type for all values sent from a front to a back.
@ -244,14 +244,14 @@ class FrontToBackPacket(
operation_id: A unique-with-respect-to-equality hashable object identifying
a particular operation.
sequence_number: A zero-indexed integer sequence number identifying the
packet's place among all the packets sent from front to back for this
ticket's place among all the tickets sent from front to back for this
particular operation. Must be zero if kind is Kind.COMMENCEMENT or
Kind.ENTIRE. Must be positive for any other kind.
kind: A Kind value describing the overall kind of ticket.
name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
or Kind.ENTIRE. Must be None for any other kind.
subscription: An ServicedSubscription.Kind value describing the interest
the front has in packets sent from the back. Must be present if
the front has in tickets sent from the back. Must be present if
kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind.
trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None.
@ -269,7 +269,7 @@ class FrontToBackPacket(
@enum.unique
class Kind(enum.Enum):
"""Identifies the overall kind of a FrontToBackPacket."""
"""Identifies the overall kind of a FrontToBackTicket."""
COMMENCEMENT = 'commencement'
CONTINUATION = 'continuation'
@ -283,9 +283,9 @@ class FrontToBackPacket(
TRANSMISSION_FAILURE = 'transmission failure'
class BackToFrontPacket(
class BackToFrontTicket(
collections.namedtuple(
'BackToFrontPacket',
'BackToFrontTicket',
['operation_id', 'sequence_number', 'kind', 'payload'])):
"""A sum type for all values sent from a back to a front.
@ -293,7 +293,7 @@ class BackToFrontPacket(
operation_id: A unique-with-respect-to-equality hashable object identifying
a particular operation.
sequence_number: A zero-indexed integer sequence number identifying the
packet's place among all the packets sent from back to front for this
ticket's place among all the tickets sent from back to front for this
particular operation.
kind: A Kind value describing the overall kind of ticket.
payload: A customer payload object. Must be present if kind is
@ -303,7 +303,7 @@ class BackToFrontPacket(
@enum.unique
class Kind(enum.Enum):
"""Identifies the overall kind of a BackToFrontPacket."""
"""Identifies the overall kind of a BackToFrontTicket."""
CONTINUATION = 'continuation'
COMPLETION = 'completion'
@ -321,10 +321,10 @@ class ForeLink(object):
@abc.abstractmethod
def accept_back_to_front_ticket(self, ticket):
"""Accept a BackToFrontPacket.
"""Accept a BackToFrontTicket.
Args:
ticket: Any BackToFrontPacket.
ticket: Any BackToFrontTicket.
"""
raise NotImplementedError()
@ -340,10 +340,10 @@ class RearLink(object):
@abc.abstractmethod
def accept_front_to_back_ticket(self, ticket):
"""Accepts a FrontToBackPacket.
"""Accepts a FrontToBackTicket.
Args:
ticket: Any FrontToBackPacket.
ticket: Any FrontToBackTicket.
"""
raise NotImplementedError()

@ -164,7 +164,7 @@ class FrontAndBackTest(object):
# pylint: disable=invalid-name
def testSimplestCall(self):
"""Tests the absolute simplest call - a one-packet fire-and-forget."""
"""Tests the absolute simplest call - a one-ticket fire-and-forget."""
self.front.operate(
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID')
@ -175,25 +175,25 @@ class FrontAndBackTest(object):
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
# (1) The packet is still either in the front waiting to be transmitted
# (1) The ticket is still either in the front waiting to be transmitted
# or is somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
# return with the packet bound for it still in the front or on the link.
# return with the ticket bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) The packet arrived at the back and the back completed the operation.
# (2) The ticket arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.Outcome.COMPLETED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
# It's true that if the packet had arrived at the back and the back had
# It's true that if the ticket had arrived at the back and the back had
# begun processing that wait_for_idle could hold test execution until the
# back completed the operation, but that doesn't really collapse the
# possibility space down to one solution.
def testEntireEcho(self):
"""Tests a very simple one-packet-each-way round-trip."""
"""Tests a very simple one-ticket-each-way round-trip."""
test_payload = 'test payload'
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
@ -212,7 +212,7 @@ class FrontAndBackTest(object):
self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self):
"""Tests sending multiple packets each way."""
"""Tests sending multiple tickets each way."""
test_payload_template = 'test_payload: %03d'
test_payloads = [test_payload_template % i for i in range(STREAM_LENGTH)]
test_consumer = stream_testing.TestConsumer()
@ -255,16 +255,16 @@ class FrontAndBackTest(object):
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
# (1) Both packets are still either in the front waiting to be transmitted
# (1) Both tickets are still either in the front waiting to be transmitted
# or are somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
# return with the packets bound for it still in the front or on the link.
# return with the tickets bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) Both packets arrived within SMALL_TIMEOUT of one another at the back.
# The back started processing based on the first packet and then stopped
# upon receiving the cancellation packet.
# (2) Both tickets arrived within SMALL_TIMEOUT of one another at the back.
# The back started processing based on the first ticket and then stopped
# upon receiving the cancellation ticket.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.Outcome.CANCELLED] = 1
self.assertIn(

Loading…
Cancel
Save