|
|
|
@ -31,6 +31,7 @@ |
|
|
|
|
|
|
|
|
|
import abc |
|
|
|
|
import collections |
|
|
|
|
import enum |
|
|
|
|
|
|
|
|
|
from grpc.framework.core import _constants |
|
|
|
|
from grpc.framework.core import _interfaces |
|
|
|
@ -42,21 +43,31 @@ _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' |
|
|
|
|
_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _SubscriptionCreation(collections.namedtuple( |
|
|
|
|
'_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))): |
|
|
|
|
class _SubscriptionCreation( |
|
|
|
|
collections.namedtuple( |
|
|
|
|
'_SubscriptionCreation', |
|
|
|
|
('kind', 'subscription', 'code', 'message',))): |
|
|
|
|
"""A sum type for the outcome of ingestion initialization. |
|
|
|
|
|
|
|
|
|
Either subscription will be non-None, remote_error will be True, or abandoned |
|
|
|
|
will be True. |
|
|
|
|
|
|
|
|
|
Attributes: |
|
|
|
|
subscription: A base.Subscription describing the customer's interest in |
|
|
|
|
operation values from the other side. |
|
|
|
|
remote_error: A boolean indicating that the subscription could not be |
|
|
|
|
created due to an error on the remote side of the operation. |
|
|
|
|
abandoned: A boolean indicating that subscription creation was abandoned. |
|
|
|
|
kind: A Kind value coarsely indicating how subscription creation completed. |
|
|
|
|
subscription: The created subscription. Only present if kind is |
|
|
|
|
Kind.SUBSCRIPTION. |
|
|
|
|
code: A code value to be sent to the other side of the operation along with |
|
|
|
|
an indication that the operation is being aborted due to an error on the |
|
|
|
|
remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. |
|
|
|
|
message: A message value to be sent to the other side of the operation |
|
|
|
|
along with an indication that the operation is being aborted due to an |
|
|
|
|
error on the remote side of the operation. Only present if kind is |
|
|
|
|
Kind.REMOTE_ERROR. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
@enum.unique |
|
|
|
|
class Kind(enum.Enum): |
|
|
|
|
SUBSCRIPTION = 'subscription' |
|
|
|
|
REMOTE_ERROR = 'remote error' |
|
|
|
|
ABANDONED = 'abandoned' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _SubscriptionCreator(object): |
|
|
|
|
"""Common specification of subscription-creating behavior.""" |
|
|
|
@ -101,12 +112,15 @@ class _ServiceSubscriptionCreator(_SubscriptionCreator): |
|
|
|
|
try: |
|
|
|
|
subscription = self._servicer.service( |
|
|
|
|
group, method, self._operation_context, self._output_operator) |
|
|
|
|
except base.NoSuchMethodError: |
|
|
|
|
return _SubscriptionCreation(None, True, False) |
|
|
|
|
except base.NoSuchMethodError as e: |
|
|
|
|
return _SubscriptionCreation( |
|
|
|
|
_SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message) |
|
|
|
|
except abandonment.Abandoned: |
|
|
|
|
return _SubscriptionCreation(None, False, True) |
|
|
|
|
return _SubscriptionCreation( |
|
|
|
|
_SubscriptionCreation.Kind.ABANDONED, None, None, None) |
|
|
|
|
else: |
|
|
|
|
return _SubscriptionCreation(subscription, False, False) |
|
|
|
|
return _SubscriptionCreation( |
|
|
|
|
_SubscriptionCreation.Kind.SUBSCRIPTION, subscription, None, None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _wrap(behavior): |
|
|
|
@ -176,10 +190,10 @@ class _IngestionManager(_interfaces.IngestionManager): |
|
|
|
|
self._pending_payloads = None |
|
|
|
|
self._pending_completion = None |
|
|
|
|
|
|
|
|
|
def _abort_and_notify(self, outcome): |
|
|
|
|
def _abort_and_notify(self, outcome, code, message): |
|
|
|
|
self._abort_internal_only() |
|
|
|
|
self._termination_manager.abort(outcome) |
|
|
|
|
self._transmission_manager.abort(outcome) |
|
|
|
|
self._transmission_manager.abort(outcome, code, message) |
|
|
|
|
self._expiration_manager.terminate() |
|
|
|
|
|
|
|
|
|
def _operator_next(self): |
|
|
|
@ -236,12 +250,12 @@ class _IngestionManager(_interfaces.IngestionManager): |
|
|
|
|
else: |
|
|
|
|
with self._lock: |
|
|
|
|
if self._termination_manager.outcome is None: |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE) |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) |
|
|
|
|
return |
|
|
|
|
else: |
|
|
|
|
with self._lock: |
|
|
|
|
if self._termination_manager.outcome is None: |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE) |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
def _operator_post_create(self, subscription): |
|
|
|
@ -260,20 +274,22 @@ class _IngestionManager(_interfaces.IngestionManager): |
|
|
|
|
|
|
|
|
|
def _create(self, subscription_creator, group, name): |
|
|
|
|
outcome = callable_util.call_logging_exceptions( |
|
|
|
|
subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, |
|
|
|
|
group, name) |
|
|
|
|
subscription_creator.create, |
|
|
|
|
_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, group, name) |
|
|
|
|
if outcome.return_value is None: |
|
|
|
|
with self._lock: |
|
|
|
|
if self._termination_manager.outcome is None: |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE) |
|
|
|
|
elif outcome.return_value.abandoned: |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) |
|
|
|
|
elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED: |
|
|
|
|
with self._lock: |
|
|
|
|
if self._termination_manager.outcome is None: |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE) |
|
|
|
|
elif outcome.return_value.remote_error: |
|
|
|
|
self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) |
|
|
|
|
elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR: |
|
|
|
|
code = outcome.return_value.code |
|
|
|
|
message = outcome.return_value.message |
|
|
|
|
with self._lock: |
|
|
|
|
if self._termination_manager.outcome is None: |
|
|
|
|
self._abort_and_notify(base.Outcome.REMOTE_FAILURE) |
|
|
|
|
self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message) |
|
|
|
|
elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: |
|
|
|
|
self._operator_post_create(outcome.return_value.subscription) |
|
|
|
|
else: |
|
|
|
|