@ -35,6 +35,7 @@ import enum
from grpc . framework . core import _constants
from grpc . framework . core import _constants
from grpc . framework . core import _interfaces
from grpc . framework . core import _interfaces
from grpc . framework . core import _utilities
from grpc . framework . foundation import abandonment
from grpc . framework . foundation import abandonment
from grpc . framework . foundation import callable_util
from grpc . framework . foundation import callable_util
from grpc . framework . interfaces . base import base
from grpc . framework . interfaces . base import base
@ -46,7 +47,7 @@ _INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
class _SubscriptionCreation (
class _SubscriptionCreation (
collections . namedtuple (
collections . namedtuple (
' _SubscriptionCreation ' ,
' _SubscriptionCreation ' ,
( ' kind ' , ' subscription ' , ' code ' , ' message ' , ) ) ) :
( ' kind ' , ' subscription ' , ' code ' , ' details ' , ) ) ) :
""" A sum type for the outcome of ingestion initialization.
""" A sum type for the outcome of ingestion initialization.
Attributes :
Attributes :
@ -56,7 +57,7 @@ class _SubscriptionCreation(
code : A code value to be sent to the other side of the operation along with
code : A code value to be sent to the other side of the operation along with
an indication that the operation is being aborted due to an error on the
an indication that the operation is being aborted due to an error on the
remote side of the operation . Only present if kind is Kind . REMOTE_ERROR .
remote side of the operation . Only present if kind is Kind . REMOTE_ERROR .
message : A message value to be sent to the other side of the operation
details : A details value to be sent to the other side of the operation
along with an indication that the operation is being aborted due to an
along with an indication that the operation is being aborted due to an
error on the remote side of the operation . Only present if kind is
error on the remote side of the operation . Only present if kind is
Kind . REMOTE_ERROR .
Kind . REMOTE_ERROR .
@ -190,11 +191,13 @@ class _IngestionManager(_interfaces.IngestionManager):
self . _pending_payloads = None
self . _pending_payloads = None
self . _pending_completion = None
self . _pending_completion = None
def _abort_and_notify ( self , outcome , code , message ) :
def _abort_and_notify ( self , outcome_kind , code , details ) :
self . _abort_internal_only ( )
self . _abort_internal_only ( )
self . _termination_manager . abort ( outcome )
if self . _termination_manager . outcome is None :
self . _transmission_manager . abort ( outcome , code , message )
outcome = _utilities . Outcome ( outcome_kind , code , details )
self . _expiration_manager . terminate ( )
self . _termination_manager . abort ( outcome )
self . _transmission_manager . abort ( outcome )
self . _expiration_manager . terminate ( )
def _operator_next ( self ) :
def _operator_next ( self ) :
""" Computes the next step for full-subscription ingestion.
""" Computes the next step for full-subscription ingestion.
@ -250,12 +253,13 @@ class _IngestionManager(_interfaces.IngestionManager):
else :
else :
with self . _lock :
with self . _lock :
if self . _termination_manager . outcome is None :
if self . _termination_manager . outcome is None :
self . _abort_and_notify ( base . Outcome . LOCAL_FAILURE , None , None )
self . _abort_and_notify (
base . Outcome . Kind . LOCAL_FAILURE , None , None )
return
return
else :
else :
with self . _lock :
with self . _lock :
if self . _termination_manager . outcome is None :
if self . _termination_manager . outcome is None :
self . _abort_and_notify ( base . Outcome . LOCAL_FAILURE , None , None )
self . _abort_and_notify ( base . Outcome . Kind . LOCAL_FAILURE , None , None )
return
return
def _operator_post_create ( self , subscription ) :
def _operator_post_create ( self , subscription ) :
@ -279,17 +283,18 @@ class _IngestionManager(_interfaces.IngestionManager):
if outcome . return_value is None :
if outcome . return_value is None :
with self . _lock :
with self . _lock :
if self . _termination_manager . outcome is None :
if self . _termination_manager . outcome is None :
self . _abort_and_notify ( base . Outcome . LOCAL_FAILURE , None , None )
self . _abort_and_notify ( base . Outcome . Kind . LOCAL_FAILURE , None , None )
elif outcome . return_value . kind is _SubscriptionCreation . Kind . ABANDONED :
elif outcome . return_value . kind is _SubscriptionCreation . Kind . ABANDONED :
with self . _lock :
with self . _lock :
if self . _termination_manager . outcome is None :
if self . _termination_manager . outcome is None :
self . _abort_and_notify ( base . Outcome . LOCAL_FAILURE , None , None )
self . _abort_and_notify ( base . Outcome . Kind . LOCAL_FAILURE , None , None )
elif outcome . return_value . kind is _SubscriptionCreation . Kind . REMOTE_ERROR :
elif outcome . return_value . kind is _SubscriptionCreation . Kind . REMOTE_ERROR :
code = outcome . return_value . code
code = outcome . return_value . code
message = outcome . return_value . message
details = outcome . return_value . details
with self . _lock :
with self . _lock :
if self . _termination_manager . outcome is None :
if self . _termination_manager . outcome is None :
self . _abort_and_notify ( base . Outcome . REMOTE_FAILURE , code , message )
self . _abort_and_notify (
base . Outcome . Kind . REMOTE_FAILURE , code , details )
elif outcome . return_value . subscription . kind is base . Subscription . Kind . FULL :
elif outcome . return_value . subscription . kind is base . Subscription . Kind . FULL :
self . _operator_post_create ( outcome . return_value . subscription )
self . _operator_post_create ( outcome . return_value . subscription )
else :
else :