Merge pull request #3039 from nathanielmanistaatgoogle/transport-objects

Add a "transport" field to links.Ticket
pull/3046/head
Masood Malekghassemi 10 years ago
commit 8c4549aec7
  1. 8
      src/python/grpcio/grpc/_links/invocation.py
  2. 10
      src/python/grpcio/grpc/_links/service.py
  3. 14
      src/python/grpcio/grpc/framework/core/_transmission.py
  4. 24
      src/python/grpcio/grpc/framework/interfaces/links/links.py
  5. 2
      src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py
  6. 15
      src/python/grpcio_test/grpc_test/_links/_transmission_test.py
  7. 4
      src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
  8. 2
      src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py

@ -101,7 +101,7 @@ class _Kernel(object):
else: else:
ticket = links.Ticket( ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, 1, operation_id, rpc_state.sequence_number, None, None, None, None, 1,
None, None, None, None, None, None) None, None, None, None, None, None, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)
rpc_state.low_write = _LowWrite.OPEN rpc_state.low_write = _LowWrite.OPEN
@ -118,7 +118,7 @@ class _Kernel(object):
ticket = links.Ticket( ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, None, operation_id, rpc_state.sequence_number, None, None, None, None, None,
None, rpc_state.response_deserializer(event.bytes), None, None, None, None, rpc_state.response_deserializer(event.bytes), None, None, None,
None) None, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)
@ -129,7 +129,7 @@ class _Kernel(object):
ticket = links.Ticket( ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, operation_id, rpc_state.sequence_number, None, None,
links.Ticket.Subscription.FULL, None, None, event.metadata, None, None, links.Ticket.Subscription.FULL, None, None, event.metadata, None, None,
None, None, None) None, None, None, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)
@ -146,7 +146,7 @@ class _Kernel(object):
ticket = links.Ticket( ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, None, operation_id, rpc_state.sequence_number, None, None, None, None, None,
None, None, event.metadata, event.status.code, event.status.details, None, None, event.metadata, event.status.code, event.status.details,
termination) termination, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)

@ -131,7 +131,7 @@ class _Kernel(object):
ticket = links.Ticket( ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL, call, 0, group, method, links.Ticket.Subscription.FULL,
service_acceptance.deadline - time.time(), None, event.metadata, None, service_acceptance.deadline - time.time(), None, event.metadata, None,
None, None, None, None) None, None, None, None, 'TODO: Service Context Object!')
self._relay.add_value(ticket) self._relay.add_value(ticket)
def _on_read_event(self, event): def _on_read_event(self, event):
@ -157,7 +157,7 @@ class _Kernel(object):
# rpc_state.read = _Read.AWAITING_ALLOWANCE # rpc_state.read = _Read.AWAITING_ALLOWANCE
ticket = links.Ticket( ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None, call, rpc_state.sequence_number, None, None, None, None, None, None,
payload, None, None, None, termination) payload, None, None, None, termination, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)
@ -176,7 +176,7 @@ class _Kernel(object):
else: else:
ticket = links.Ticket( ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, 1, None, call, rpc_state.sequence_number, None, None, None, None, 1, None,
None, None, None, None, None) None, None, None, None, None, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)
rpc_state.low_write = _LowWrite.OPEN rpc_state.low_write = _LowWrite.OPEN
@ -198,7 +198,7 @@ class _Kernel(object):
termination = links.Ticket.Termination.TRANSMISSION_FAILURE termination = links.Ticket.Termination.TRANSMISSION_FAILURE
ticket = links.Ticket( ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None, call, rpc_state.sequence_number, None, None, None, None, None, None,
None, None, None, None, termination) None, None, None, None, termination, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)
@ -259,7 +259,7 @@ class _Kernel(object):
termination = links.Ticket.Termination.COMPLETION termination = links.Ticket.Termination.COMPLETION
ticket = links.Ticket( ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, call, rpc_state.sequence_number, None, None, None, None, None,
None, payload, None, None, None, termination) None, payload, None, None, None, termination, None)
rpc_state.sequence_number += 1 rpc_state.sequence_number += 1
self._relay.add_value(ticket) self._relay.add_value(ticket)

@ -107,7 +107,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
return links.Ticket( return links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, self._operation_id, self._lowest_unused_sequence_number, None,
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None,
termination) termination, None)
action = False action = False
# TODO(nathaniel): Support other subscriptions. # TODO(nathaniel): Support other subscriptions.
@ -144,7 +144,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None, self._operation_id, self._lowest_unused_sequence_number, None, None,
local_subscription, timeout, allowance, initial_metadata, payload, local_subscription, timeout, allowance, initial_metadata, payload,
terminal_metadata, code, message, termination) terminal_metadata, code, message, termination, None)
self._lowest_unused_sequence_number += 1 self._lowest_unused_sequence_number += 1
return ticket return ticket
else: else:
@ -191,7 +191,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, 0, group, method, subscription, timeout, allowance, self._operation_id, 0, group, method, subscription, timeout, allowance,
initial_metadata, payload, terminal_metadata, code, message, initial_metadata, payload, terminal_metadata, code, message,
termination) termination, None)
self._lowest_unused_sequence_number = 1 self._lowest_unused_sequence_number = 1
self._transmit(ticket) self._transmit(ticket)
@ -236,7 +236,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None, self._operation_id, self._lowest_unused_sequence_number, None, None,
None, None, allowance, effective_initial_metadata, ticket_payload, None, None, allowance, effective_initial_metadata, ticket_payload,
terminal_metadata, code, message, termination) terminal_metadata, code, message, termination, None)
self._lowest_unused_sequence_number += 1 self._lowest_unused_sequence_number += 1
self._transmit(ticket) self._transmit(ticket)
@ -247,7 +247,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
else: else:
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None, self._operation_id, self._lowest_unused_sequence_number, None, None,
None, timeout, None, None, None, None, None, None, None) None, timeout, None, None, None, None, None, None, None, None)
self._lowest_unused_sequence_number += 1 self._lowest_unused_sequence_number += 1
self._transmit(ticket) self._transmit(ticket)
@ -268,7 +268,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, None, self._operation_id, self._lowest_unused_sequence_number, None, None,
None, None, None, None, payload, terminal_metadata, code, message, None, None, None, None, payload, terminal_metadata, code, message,
termination) termination, None)
self._lowest_unused_sequence_number += 1 self._lowest_unused_sequence_number += 1
self._transmit(ticket) self._transmit(ticket)
@ -290,5 +290,5 @@ class TransmissionManager(_interfaces.TransmissionManager):
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None, self._operation_id, self._lowest_unused_sequence_number, None,
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None,
termination) termination, None)
self._transmit(ticket) self._transmit(ticket)

@ -34,12 +34,30 @@ import collections
import enum import enum
class Transport(collections.namedtuple('Transport', ('kind', 'value',))):
"""A sum type for handles to an underlying transport system.
Attributes:
kind: A Kind value identifying the kind of value being passed to or from
the underlying transport.
value: The value being passed through RPC Framework between the high-level
application and the underlying transport.
"""
@enum.unique
class Kind(enum.Enum):
CALL_OPTION = 'call option'
SERVICER_CONTEXT = 'servicer context'
INVOCATION_CONTEXT = 'invocation context'
class Ticket( class Ticket(
collections.namedtuple( collections.namedtuple(
'Ticket', 'Ticket',
['operation_id', 'sequence_number', 'group', 'method', 'subscription', ('operation_id', 'sequence_number', 'group', 'method', 'subscription',
'timeout', 'allowance', 'initial_metadata', 'payload', 'timeout', 'allowance', 'initial_metadata', 'payload',
'terminal_metadata', 'code', 'message', 'termination'])): 'terminal_metadata', 'code', 'message', 'termination',
'transport',))):
"""A sum type for all values sent from a front to a back. """A sum type for all values sent from a front to a back.
Attributes: Attributes:
@ -81,6 +99,8 @@ class Ticket(
termination: A Termination value describing the end of the operation, or termination: A Termination value describing the end of the operation, or
None if the operation has not yet terminated. If set, no further tickets None if the operation has not yet terminated. If set, no further tickets
may be sent in the same direction. may be sent in the same direction.
transport: A Transport value or None, with further semantics being a matter
between high-level application and underlying transport.
""" """
@enum.unique @enum.unique

@ -66,7 +66,7 @@ class LonelyInvocationLinkTest(unittest.TestCase):
ticket = links.Ticket( ticket = links.Ticket(
test_operation_id, 0, test_group, test_method, test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None, links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None,
None, None, None, None, termination) None, None, None, None, termination, None)
invocation_link.accept_ticket(ticket) invocation_link.accept_ticket(ticket)
invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated) invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated)

@ -128,14 +128,14 @@ class RoundTripTest(unittest.TestCase):
invocation_ticket = links.Ticket( invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method, test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, links.Ticket.Termination.COMPLETION) None, None, None, None, links.Ticket.Termination.COMPLETION, None)
invocation_link.accept_ticket(invocation_ticket) invocation_link.accept_ticket(invocation_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated) service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_ticket = links.Ticket( service_ticket = links.Ticket(
service_mate.tickets()[-1].operation_id, 0, None, None, None, None, service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
None, None, None, None, test_code, test_message, None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION) links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_ticket) service_link.accept_ticket(service_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated) invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
@ -174,33 +174,34 @@ class RoundTripTest(unittest.TestCase):
invocation_ticket = links.Ticket( invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method, test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, None) None, None, None, None, None, None)
invocation_link.accept_ticket(invocation_ticket) invocation_link.accept_ticket(invocation_ticket)
requests = scenario.requests() requests = scenario.requests()
for request_index, request in enumerate(requests): for request_index, request in enumerate(requests):
request_ticket = links.Ticket( request_ticket = links.Ticket(
test_operation_id, 1 + request_index, None, None, None, None, 1, None, test_operation_id, 1 + request_index, None, None, None, None, 1, None,
request, None, None, None, None) request, None, None, None, None, None)
invocation_link.accept_ticket(request_ticket) invocation_link.accept_ticket(request_ticket)
service_mate.block_until_tickets_satisfy( service_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index)) test_cases.at_least_n_payloads_received_predicate(1 + request_index))
response_ticket = links.Ticket( response_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_index, None, None, service_mate.tickets()[0].operation_id, request_index, None, None,
None, None, 1, None, scenario.response_for_request(request), None, None, None, 1, None, scenario.response_for_request(request), None,
None, None, None) None, None, None, None)
service_link.accept_ticket(response_ticket) service_link.accept_ticket(response_ticket)
invocation_mate.block_until_tickets_satisfy( invocation_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index)) test_cases.at_least_n_payloads_received_predicate(1 + request_index))
request_count = len(requests) request_count = len(requests)
invocation_completion_ticket = links.Ticket( invocation_completion_ticket = links.Ticket(
test_operation_id, request_count + 1, None, None, None, None, None, test_operation_id, request_count + 1, None, None, None, None, None,
None, None, None, None, None, links.Ticket.Termination.COMPLETION) None, None, None, None, None, links.Ticket.Termination.COMPLETION,
None)
invocation_link.accept_ticket(invocation_completion_ticket) invocation_link.accept_ticket(invocation_completion_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated) service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_completion_ticket = links.Ticket( service_completion_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_count, None, None, None, service_mate.tickets()[0].operation_id, request_count, None, None, None,
None, None, None, None, None, test_code, test_message, None, None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION) links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_completion_ticket) service_link.accept_ticket(service_completion_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated) invocation_mate.block_until_tickets_satisfy(test_cases.terminated)

@ -300,7 +300,7 @@ class TransmissionTest(object):
invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD, invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata, links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata,
invocation_payload, invocation_terminal_metadata, invocation_code, invocation_payload, invocation_terminal_metadata, invocation_code,
invocation_message, links.Ticket.Termination.COMPLETION) invocation_message, links.Ticket.Termination.COMPLETION, None)
self._invocation_link.accept_ticket(original_invocation_ticket) self._invocation_link.accept_ticket(original_invocation_ticket)
self._service_mate.block_until_tickets_satisfy( self._service_mate.block_until_tickets_satisfy(
@ -317,7 +317,7 @@ class TransmissionTest(object):
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL, service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload, timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message, service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION) links.Ticket.Termination.COMPLETION, None)
self._service_link.accept_ticket(original_service_ticket) self._service_link.accept_ticket(original_service_ticket)
self._invocation_mate.block_until_tickets_satisfy(terminated) self._invocation_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_service_sequence( self._assert_is_valid_service_sequence(

@ -64,7 +64,7 @@ def _safe_for_log_ticket(ticket):
ticket.allowance, ticket.initial_metadata, ticket.allowance, ticket.initial_metadata,
'<payload of length {}>'.format(payload_length), '<payload of length {}>'.format(payload_length),
ticket.terminal_metadata, ticket.code, ticket.message, ticket.terminal_metadata, ticket.code, ticket.message,
ticket.termination) ticket.termination, None)
class RecordingLink(links.Link): class RecordingLink(links.Link):

Loading…
Cancel
Save