@ -39,14 +39,24 @@ from grpc.framework.foundation import callable_util
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = ' Exception during transmission! '
_FRONT_TO_BACK_NO_TRANSMISSION_KIND S = (
packets . Kind . SERVICER_FAILURE ,
_FRONT_TO_BACK_NO_TRANSMISSION_OUTCOME S = (
interfaces . Outcome . SERVICER_FAILURE ,
)
_BACK_TO_FRONT_NO_TRANSMISSION_KIND S = (
packets . Kind . CANCELLATION ,
packets . Kind . SERVICED_FAILURE ,
_BACK_TO_FRONT_NO_TRANSMISSION_OUTCOME S = (
interfaces . Outcome . CANCELLED ,
interfaces . Outcome . SERVICED_FAILURE ,
)
_ABORTION_OUTCOME_TO_PACKET_KIND = {
interfaces . Outcome . CANCELLED : packets . Kind . CANCELLATION ,
interfaces . Outcome . EXPIRED : packets . Kind . EXPIRATION ,
interfaces . Outcome . RECEPTION_FAILURE : packets . Kind . RECEPTION_FAILURE ,
interfaces . Outcome . TRANSMISSION_FAILURE : packets . Kind . TRANSMISSION_FAILURE ,
interfaces . Outcome . SERVICED_FAILURE : packets . Kind . SERVICED_FAILURE ,
interfaces . Outcome . SERVICER_FAILURE : packets . Kind . SERVICER_FAILURE ,
}
class _Packetizer ( object ) :
""" Common specification of different packet-creating behavior. """
@ -72,18 +82,18 @@ class _Packetizer(object):
raise NotImplementedError ( )
@abc . abstractmethod
def packetize_abortion ( self , operation_id , sequence_number , kind ) :
def packetize_abortion ( self , operation_id , sequence_number , outcome ) :
""" Creates a packet indicating that the operation is aborted.
Args :
operation_id : The operation ID for the current operation .
sequence_number : A sequence number for the packet .
kind : One of the values of packets . Kind indicating operational abortion .
outcome : An interfaces . Outcome value describing the operation abortion .
Returns :
An object of an appropriate type suitable for transmission to the other
side of the operation , or None if transmission is not appropriate for
the given kind .
the given outcome .
"""
raise NotImplementedError ( )
@ -122,11 +132,12 @@ class _FrontPacketizer(_Packetizer):
self . _name , self . _subscription_kind , self . _trace_id , payload ,
self . _timeout )
def packetize_abortion ( self , operation_id , sequence_number , kind ) :
def packetize_abortion ( self , operation_id , sequence_number , outcome ) :
""" See _Packetizer.packetize_abortion for specification. """
if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KIND S:
if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOME S:
return None
else :
kind = _ABORTION_OUTCOME_TO_PACKET_KIND [ outcome ]
return packets . FrontToBackPacket (
operation_id , sequence_number , kind , None , None , None , None , None )
@ -141,11 +152,12 @@ class _BackPacketizer(_Packetizer):
packets . Kind . COMPLETION if complete else packets . Kind . CONTINUATION ,
payload )
def packetize_abortion ( self , operation_id , sequence_number , kind ) :
def packetize_abortion ( self , operation_id , sequence_number , outcome ) :
""" See _Packetizer.packetize_abortion for specification. """
if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KIND S:
if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOME S:
return None
else :
kind = _ABORTION_OUTCOME_TO_PACKET_KIND [ outcome ]
return packets . BackToFrontPacket (
operation_id , sequence_number , kind , None )
@ -178,7 +190,7 @@ class _EmptyTransmissionManager(TransmissionManager):
def inmit ( self , emission , complete ) :
""" See _interfaces.TransmissionManager.inmit for specification. """
def abort ( self , category ) :
def abort ( self , outcome ) :
""" See _interfaces.TransmissionManager.abort for specification. """
@ -212,7 +224,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
self . _emissions = [ ]
self . _emission_complete = False
self . _kind = None
self . _outcome = None
self . _lowest_unused_sequence_number = 0
self . _transmitting = False
@ -239,17 +251,17 @@ class _TransmittingTransmissionManager(TransmissionManager):
return self . _packetizer . packetize (
self . _operation_id , sequence_number , emission , complete )
def _abortive_response_packet ( self , kind ) :
def _abortive_response_packet ( self , outcome ) :
""" Creates a packet indicating operation abortion.
Args :
kind : One of the values of packets . Kind indicating operational abortion .
outcome : An interfaces . Outcome value describing operation abortion .
Returns :
A packet indicating operation abortion .
"""
packet = self . _packetizer . packetize_abortion (
self . _operation_id , self . _lowest_unused_sequence_number , kind )
self . _operation_id , self . _lowest_unused_sequence_number , outcome )
if packet is None :
return None
else :
@ -267,7 +279,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
"""
if self . _emissions is None :
return False , None
elif self . _kind is None :
elif self . _outcome is None :
if self . _emissions :
payload = self . _emissions . pop ( 0 )
complete = self . _emission_complete and not self . _emissions
@ -278,7 +290,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
else :
return self . _emission_complete , None
else :
packet = self . _abortive_response_packet ( self . _kind )
packet = self . _abortive_response_packet ( self . _outcome )
self . _emissions = None
return False , None if packet is None else packet
@ -303,7 +315,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
else :
with self . _lock :
self . _emissions = None
self . _termination_manager . abort ( packets . Kind . TRANSMISSION_FAILURE )
self . _termination_manager . abort (
interfaces . Outcome . TRANSMISSION_FAILURE )
self . _ingestion_manager . abort ( )
self . _expiration_manager . abort ( )
self . _transmitting = False
@ -315,19 +328,19 @@ class _TransmittingTransmissionManager(TransmissionManager):
def inmit ( self , emission , complete ) :
""" See _interfaces.TransmissionManager.inmit for specification. """
if self . _emissions is not None and self . _kind is None :
if self . _emissions is not None and self . _outcome is None :
self . _emission_complete = complete
if self . _transmitting :
self . _emissions . append ( emission )
else :
self . _transmit ( self . _lead_packet ( emission , complete ) )
def abort ( self , kind ) :
def abort ( self , outcome ) :
""" See _interfaces.TransmissionManager.abort for specification. """
if self . _emissions is not None and self . _kind is None :
self . _kind = kind
if self . _emissions is not None and self . _outcome is None :
self . _outcome = outcome
if not self . _transmitting :
packet = self . _abortive_response_packet ( kind )
packet = self . _abortive_response_packet ( outcome )
self . _emissions = None
if packet is not None :
self . _transmit ( packet )