Merge pull request #2918 from nathanielmanistaatgoogle/2916

Service-side read without allowance
pull/2925/head
Masood Malekghassemi 9 years ago
commit dcd6112f2c
  1. 43
      src/python/grpcio/grpc/_links/service.py
  2. 9
      src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py

@ -44,7 +44,10 @@ from grpc.framework.interfaces.links import links
@enum.unique
class _Read(enum.Enum):
READING = 'reading'
AWAITING_ALLOWANCE = 'awaiting allowance'
# TODO(issue 2916): This state will again be necessary after eliminating the
# "early_read" field of _RPCState and going back to only reading when granted
# allowance to read.
# AWAITING_ALLOWANCE = 'awaiting allowance'
CLOSED = 'closed'
@ -67,12 +70,15 @@ class _RPCState(object):
def __init__(
self, request_deserializer, response_serializer, sequence_number, read,
allowance, high_write, low_write, premetadataed, terminal_metadata, code,
message):
early_read, allowance, high_write, low_write, premetadataed,
terminal_metadata, code, message):
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.sequence_number = sequence_number
self.read = read
# TODO(issue 2916): Eliminate this by eliminating the necessity of calling
# call.read just to advance the RPC.
self.early_read = early_read # A raw (not deserialized) read.
self.allowance = allowance
self.high_write = high_write
self.low_write = low_write
@ -120,7 +126,7 @@ class _Kernel(object):
call.read(call)
self._rpc_states[call] = _RPCState(
request_deserializer, response_serializer, 1, _Read.READING, 0,
request_deserializer, response_serializer, 1, _Read.READING, None, 1,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None)
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
@ -140,12 +146,15 @@ class _Kernel(object):
termination = links.Ticket.Termination.COMPLETION
else:
if 0 < rpc_state.allowance:
payload = rpc_state.request_deserializer(event.bytes)
termination = None
rpc_state.allowance -= 1
call.read(call)
else:
rpc_state.read = _Read.AWAITING_ALLOWANCE
payload = rpc_state.request_deserializer(event.bytes)
termination = None
rpc_state.early_read = event.bytes
return
# TODO(issue 2916): Instead of returning:
# rpc_state.read = _Read.AWAITING_ALLOWANCE
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None,
payload, None, None, None, termination)
@ -237,12 +246,22 @@ class _Kernel(object):
rpc_state.premetadataed = True
if ticket.allowance is not None:
if rpc_state.read is _Read.AWAITING_ALLOWANCE:
rpc_state.allowance += ticket.allowance - 1
call.read(call)
rpc_state.read = _Read.READING
else:
if rpc_state.early_read is None:
rpc_state.allowance += ticket.allowance
else:
payload = rpc_state.request_deserializer(rpc_state.early_read)
rpc_state.allowance += ticket.allowance - 1
rpc_state.early_read = None
if rpc_state.read is _Read.READING:
call.read(call)
termination = None
else:
termination = links.Ticket.Termination.COMPLETION
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None,
None, payload, None, None, None, termination)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
if ticket.payload is not None:
call.write(rpc_state.response_serializer(ticket.payload), call)

@ -303,16 +303,9 @@ class TransmissionTest(object):
invocation_message, links.Ticket.Termination.COMPLETION)
self._invocation_link.accept_ticket(original_invocation_ticket)
# TODO(nathaniel): This shouldn't be necessary. Detecting the end of the
# invocation-side ticket sequence shouldn't require granting allowance for
# another payload.
self._service_mate.block_until_tickets_satisfy(
at_least_n_payloads_received_predicate(1))
service_operation_id = self._service_mate.tickets()[0].operation_id
self._service_link.accept_ticket(
links.Ticket(
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
None, 1, None, None, None, None, None, None))
self._service_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_invocation_sequence(
@ -321,7 +314,7 @@ class TransmissionTest(object):
invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
original_service_ticket = links.Ticket(
service_operation_id, 1, None, None, links.Ticket.Subscription.FULL,
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION)

Loading…
Cancel
Save