Merge pull request #3170 from nathanielmanistaatgoogle/fixes

Fixes for bugs found during manual use
pull/3177/head
Masood Malekghassemi 9 years ago
commit 924b67da5e
  1. 2
      src/python/grpcio/grpc/framework/core/_ingestion.py
  2. 101
      src/python/grpcio/grpc/framework/core/_transmission.py
  3. 2
      src/python/grpcio/grpc/framework/crust/_calls.py
  4. 2
      src/python/grpcio/grpc/framework/crust/_service.py
  5. 13
      src/python/grpcio/grpc/framework/crust/implementations.py
  6. 8
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py

@ -114,7 +114,7 @@ class _ServiceSubscriptionCreator(_SubscriptionCreator):
group, method, self._operation_context, self._output_operator) group, method, self._operation_context, self._output_operator)
except base.NoSuchMethodError as e: except base.NoSuchMethodError as e:
return _SubscriptionCreation( return _SubscriptionCreation(
_SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message) _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.details)
except abandonment.Abandoned: except abandonment.Abandoned:
return _SubscriptionCreation( return _SubscriptionCreation(
_SubscriptionCreation.Kind.ABANDONED, None, None, None) _SubscriptionCreation.Kind.ABANDONED, None, None, None)

@ -29,6 +29,9 @@
"""State and behavior for ticket transmission during an operation.""" """State and behavior for ticket transmission during an operation."""
import collections
import enum
from grpc.framework.core import _constants from grpc.framework.core import _constants
from grpc.framework.core import _interfaces from grpc.framework.core import _interfaces
from grpc.framework.foundation import callable_util from grpc.framework.foundation import callable_util
@ -47,6 +50,31 @@ def _explode_completion(completion):
links.Ticket.Termination.COMPLETION) links.Ticket.Termination.COMPLETION)
class _Abort(
collections.namedtuple(
'_Abort', ('kind', 'termination', 'code', 'details',))):
"""Tracks whether the operation aborted and what is to be done about it.
Attributes:
kind: A Kind value describing the overall kind of the _Abort.
termination: A links.Ticket.Termination value to be sent to the other side
of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
code: A code value to be sent to the other side of the operation. Only
valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
details: A details value to be sent to the other side of the operation.
Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
"""
@enum.unique
class Kind(enum.Enum):
NOT_ABORTED = 'not aborted'
ABORTED_NOTIFY_NEEDED = 'aborted notify needed'
ABORTED_NO_NOTIFY = 'aborted no notify'
_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None)
_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None)
class TransmissionManager(_interfaces.TransmissionManager): class TransmissionManager(_interfaces.TransmissionManager):
"""An _interfaces.TransmissionManager that sends links.Tickets.""" """An _interfaces.TransmissionManager that sends links.Tickets."""
@ -79,8 +107,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
self._initial_metadata = None self._initial_metadata = None
self._payloads = [] self._payloads = []
self._completion = None self._completion = None
self._aborted = False self._abort = _NOT_ABORTED
self._abortion_outcome = None
self._transmitting = False self._transmitting = False
def set_expiration_manager(self, expiration_manager): def set_expiration_manager(self, expiration_manager):
@ -94,24 +121,15 @@ class TransmissionManager(_interfaces.TransmissionManager):
A links.Ticket to be sent to the other side of the operation or None if A links.Ticket to be sent to the other side of the operation or None if
there is nothing to be sent at this time. there is nothing to be sent at this time.
""" """
if self._aborted: if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY:
if self._abortion_outcome is None: return None
return None elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED:
else: termination = self._abort.termination
termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ code, details = self._abort.code, self._abort.details
self._abortion_outcome] self._abort = _ABORTED_NO_NOTIFY
if termination is None: return links.Ticket(
return None self._operation_id, self._lowest_unused_sequence_number, None, None,
else: None, None, None, None, None, None, code, details, termination, None)
self._abortion_outcome = None
if self._completion is None:
code, message = None, None
else:
code, message = self._completion.code, self._completion.message
return links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None,
None, None, None, None, None, None, None, code, message,
termination, None)
action = False action = False
# TODO(nathaniel): Support other subscriptions. # TODO(nathaniel): Support other subscriptions.
@ -174,6 +192,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
return return
else: else:
with self._lock: with self._lock:
self._abort = _ABORTED_NO_NOTIFY
if self._termination_manager.outcome is None: if self._termination_manager.outcome is None:
self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE) self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
self._expiration_manager.terminate() self._expiration_manager.terminate()
@ -201,6 +220,9 @@ class TransmissionManager(_interfaces.TransmissionManager):
def advance(self, initial_metadata, payload, completion, allowance): def advance(self, initial_metadata, payload, completion, allowance):
"""See _interfaces.TransmissionManager.advance for specification.""" """See _interfaces.TransmissionManager.advance for specification."""
if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
return
effective_initial_metadata = initial_metadata effective_initial_metadata = initial_metadata
effective_payload = payload effective_payload = payload
effective_completion = completion effective_completion = completion
@ -246,7 +268,9 @@ class TransmissionManager(_interfaces.TransmissionManager):
def timeout(self, timeout): def timeout(self, timeout):
"""See _interfaces.TransmissionManager.timeout for specification.""" """See _interfaces.TransmissionManager.timeout for specification."""
if self._transmitting: if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
return
elif self._transmitting:
self._timeout = timeout self._timeout = timeout
else: else:
ticket = links.Ticket( ticket = links.Ticket(
@ -257,7 +281,9 @@ class TransmissionManager(_interfaces.TransmissionManager):
def allowance(self, allowance): def allowance(self, allowance):
"""See _interfaces.TransmissionManager.allowance for specification.""" """See _interfaces.TransmissionManager.allowance for specification."""
if self._transmitting or not self._payloads: if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
return
elif self._transmitting or not self._payloads:
self._remote_allowance += allowance self._remote_allowance += allowance
else: else:
self._remote_allowance += allowance - 1 self._remote_allowance += allowance - 1
@ -283,20 +309,17 @@ class TransmissionManager(_interfaces.TransmissionManager):
def abort(self, outcome, code, message): def abort(self, outcome, code, message):
"""See _interfaces.TransmissionManager.abort for specification.""" """See _interfaces.TransmissionManager.abort for specification."""
if self._transmitting: if self._abort.kind is _Abort.Kind.NOT_ABORTED:
self._aborted, self._abortion_outcome = True, outcome termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
else: outcome)
self._aborted = True if termination is None:
if outcome is not None: self._abort = _ABORTED_NO_NOTIFY
termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ elif self._transmitting:
outcome] self._abort = _Abort(
if termination is not None: _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, code, message)
if self._completion is None: else:
code, message = None, None ticket = links.Ticket(
else: self._operation_id, self._lowest_unused_sequence_number, None,
code, message = self._completion.code, self._completion.message None, None, None, None, None, None, None, code, message,
ticket = links.Ticket( termination, None)
self._operation_id, self._lowest_unused_sequence_number, None, self._transmit(ticket)
None, None, None, None, None, None, None, code, message,
termination, None)
self._transmit(ticket)

@ -98,7 +98,7 @@ def blocking_unary_unary(
rendezvous, unused_operation_context, unused_outcome = _invoke( rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True) end, group, method, timeout, initial_metadata, payload, True)
if with_call: if with_call:
return next(rendezvous, rendezvous) return next(rendezvous), rendezvous
else: else:
return next(rendezvous) return next(rendezvous)

@ -154,7 +154,7 @@ def adapt_multi_method(multi_method, pool):
outcome = operation_context.add_termination_callback(rendezvous.set_outcome) outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is None: if outcome is None:
def in_pool(): def in_pool():
request_consumer = multi_method( request_consumer = multi_method.service(
group, method, rendezvous, _ServicerContext(rendezvous)) group, method, rendezvous, _ServicerContext(rendezvous))
for request in rendezvous: for request in rendezvous:
request_consumer.consume(request) request_consumer.consume(request)

@ -49,12 +49,12 @@ class _BaseServicer(base.Servicer):
return adapted_method(output_operator, context) return adapted_method(output_operator, context)
elif self._adapted_multi_method is not None: elif self._adapted_multi_method is not None:
try: try:
return self._adapted_multi_method.service( return self._adapted_multi_method(
group, method, output_operator, context) group, method, output_operator, context)
except face.NoSuchMethodError: except face.NoSuchMethodError:
raise base.NoSuchMethodError() raise base.NoSuchMethodError(None, None)
else: else:
raise base.NoSuchMethodError() raise base.NoSuchMethodError(None, None)
class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable): class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
@ -315,8 +315,11 @@ def servicer(method_implementations, multi_method_implementation, pool):
""" """
adapted_implementations = _adapt_method_implementations( adapted_implementations = _adapt_method_implementations(
method_implementations, pool) method_implementations, pool)
adapted_multi_method_implementation = _service.adapt_multi_method( if multi_method_implementation is None:
multi_method_implementation, pool) adapted_multi_method_implementation = None
else:
adapted_multi_method_implementation = _service.adapt_multi_method(
multi_method_implementation, pool)
return _BaseServicer( return _BaseServicer(
adapted_implementations, adapted_multi_method_implementation) adapted_implementations, adapted_multi_method_implementation)

@ -82,8 +82,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for test_messages in test_messages_sequence: for test_messages in test_messages_sequence:
request = test_messages.request() request = test_messages.request()
response = self._invoker.blocking(group, method)( response, call = self._invoker.blocking(group, method)(
request, test_constants.LONG_TIMEOUT) request, test_constants.LONG_TIMEOUT, with_call=True)
test_messages.verify(request, response, self) test_messages.verify(request, response, self)
@ -105,8 +105,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for test_messages in test_messages_sequence: for test_messages in test_messages_sequence:
requests = test_messages.requests() requests = test_messages.requests()
response = self._invoker.blocking(group, method)( response, call = self._invoker.blocking(group, method)(
iter(requests), test_constants.LONG_TIMEOUT) iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
test_messages.verify(requests, response, self) test_messages.verify(requests, response, self)

Loading…
Cancel
Save