diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index 0058ae91f87..a74c77ebcc2 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -101,7 +101,7 @@ class _Kernel(object): else: ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, 1, - None, None, None, None, None, None) + None, None, None, None, None, None, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) rpc_state.low_write = _LowWrite.OPEN @@ -118,7 +118,7 @@ class _Kernel(object): ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, None, None, rpc_state.response_deserializer(event.bytes), None, None, None, - None) + None, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) @@ -129,7 +129,7 @@ class _Kernel(object): ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, links.Ticket.Subscription.FULL, None, None, event.metadata, None, None, - None, None, None) + None, None, None, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) @@ -146,7 +146,7 @@ class _Kernel(object): ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, None, None, None, event.metadata, event.status.code, event.status.details, - termination) + termination, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 5c636d61abc..de78e82cd61 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -131,7 +131,7 @@ class _Kernel(object): ticket = links.Ticket( call, 0, group, method, links.Ticket.Subscription.FULL, service_acceptance.deadline - time.time(), None, event.metadata, None, - None, None, None, None) + None, None, None, None, 'TODO: Service Context Object!') self._relay.add_value(ticket) def _on_read_event(self, event): @@ -157,7 +157,7 @@ class _Kernel(object): # rpc_state.read = _Read.AWAITING_ALLOWANCE ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, None, None, - payload, None, None, None, termination) + payload, None, None, None, termination, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) @@ -176,7 +176,7 @@ class _Kernel(object): else: ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, 1, None, - None, None, None, None, None) + None, None, None, None, None, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) rpc_state.low_write = _LowWrite.OPEN @@ -198,7 +198,7 @@ class _Kernel(object): termination = links.Ticket.Termination.TRANSMISSION_FAILURE ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, None, None, - None, None, None, None, termination) + None, None, None, None, termination, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) @@ -259,7 +259,7 @@ class _Kernel(object): termination = links.Ticket.Termination.COMPLETION ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, None, - None, payload, None, None, None, termination) + None, payload, None, None, None, termination, None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py index 01894d398dc..03644f4d491 100644 --- a/src/python/grpcio/grpc/framework/core/_transmission.py +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -107,7 +107,7 @@ class TransmissionManager(_interfaces.TransmissionManager): return links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, None, None, None, None, None, None, None, None, None, - termination) + termination, None) action = False # TODO(nathaniel): Support other subscriptions. @@ -144,7 +144,7 @@ class TransmissionManager(_interfaces.TransmissionManager): ticket = links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, None, local_subscription, timeout, allowance, initial_metadata, payload, - terminal_metadata, code, message, termination) + terminal_metadata, code, message, termination, None) self._lowest_unused_sequence_number += 1 return ticket else: @@ -191,7 +191,7 @@ class TransmissionManager(_interfaces.TransmissionManager): ticket = links.Ticket( self._operation_id, 0, group, method, subscription, timeout, allowance, initial_metadata, payload, terminal_metadata, code, message, - termination) + termination, None) self._lowest_unused_sequence_number = 1 self._transmit(ticket) @@ -236,7 +236,7 @@ class TransmissionManager(_interfaces.TransmissionManager): ticket = links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, None, None, None, allowance, effective_initial_metadata, ticket_payload, - terminal_metadata, code, message, termination) + terminal_metadata, code, message, termination, None) self._lowest_unused_sequence_number += 1 self._transmit(ticket) @@ -247,7 +247,7 @@ class TransmissionManager(_interfaces.TransmissionManager): else: ticket = links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, None, - None, timeout, None, None, None, None, None, None, None) + None, timeout, None, None, None, None, None, None, None, None) self._lowest_unused_sequence_number += 1 self._transmit(ticket) @@ -268,7 +268,7 @@ class TransmissionManager(_interfaces.TransmissionManager): ticket = links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, None, None, None, None, None, payload, terminal_metadata, code, message, - termination) + termination, None) self._lowest_unused_sequence_number += 1 self._transmit(ticket) @@ -290,5 +290,5 @@ class TransmissionManager(_interfaces.TransmissionManager): ticket = links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, None, None, None, None, None, None, None, None, None, - termination) + termination, None) self._transmit(ticket) diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py index 069ff024ddc..b98a30a3990 100644 --- a/src/python/grpcio/grpc/framework/interfaces/links/links.py +++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py @@ -34,12 +34,30 @@ import collections import enum +class Transport(collections.namedtuple('Transport', ('kind', 'value',))): + """A sum type for handles to an underlying transport system. + + Attributes: + kind: A Kind value identifying the kind of value being passed to or from + the underlying transport. + value: The value being passed through RPC Framework between the high-level + application and the underlying transport. + """ + + @enum.unique + class Kind(enum.Enum): + CALL_OPTION = 'call option' + SERVICER_CONTEXT = 'servicer context' + INVOCATION_CONTEXT = 'invocation context' + + class Ticket( collections.namedtuple( 'Ticket', - ['operation_id', 'sequence_number', 'group', 'method', 'subscription', + ('operation_id', 'sequence_number', 'group', 'method', 'subscription', 'timeout', 'allowance', 'initial_metadata', 'payload', - 'terminal_metadata', 'code', 'message', 'termination'])): + 'terminal_metadata', 'code', 'message', 'termination', + 'transport',))): """A sum type for all values sent from a front to a back. Attributes: @@ -81,6 +99,8 @@ class Ticket( termination: A Termination value describing the end of the operation, or None if the operation has not yet terminated. If set, no further tickets may be sent in the same direction. + transport: A Transport value or None, with further semantics being a matter + between high-level application and underlying transport. """ @enum.unique diff --git a/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py b/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py index abe240e07ab..373a2b2a1f7 100644 --- a/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py +++ b/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py @@ -66,7 +66,7 @@ class LonelyInvocationLinkTest(unittest.TestCase): ticket = links.Ticket( test_operation_id, 0, test_group, test_method, links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None, - None, None, None, None, termination) + None, None, None, None, termination, None) invocation_link.accept_ticket(ticket) invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated) diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py index 9cdc9620f0a..02ddd512c22 100644 --- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py +++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py @@ -128,14 +128,14 @@ class RoundTripTest(unittest.TestCase): invocation_ticket = links.Ticket( test_operation_id, 0, test_group, test_method, links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, - None, None, None, None, links.Ticket.Termination.COMPLETION) + None, None, None, None, links.Ticket.Termination.COMPLETION, None) invocation_link.accept_ticket(invocation_ticket) service_mate.block_until_tickets_satisfy(test_cases.terminated) service_ticket = links.Ticket( service_mate.tickets()[-1].operation_id, 0, None, None, None, None, None, None, None, None, test_code, test_message, - links.Ticket.Termination.COMPLETION) + links.Ticket.Termination.COMPLETION, None) service_link.accept_ticket(service_ticket) invocation_mate.block_until_tickets_satisfy(test_cases.terminated) @@ -174,33 +174,34 @@ class RoundTripTest(unittest.TestCase): invocation_ticket = links.Ticket( test_operation_id, 0, test_group, test_method, links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, - None, None, None, None, None) + None, None, None, None, None, None) invocation_link.accept_ticket(invocation_ticket) requests = scenario.requests() for request_index, request in enumerate(requests): request_ticket = links.Ticket( test_operation_id, 1 + request_index, None, None, None, None, 1, None, - request, None, None, None, None) + request, None, None, None, None, None) invocation_link.accept_ticket(request_ticket) service_mate.block_until_tickets_satisfy( test_cases.at_least_n_payloads_received_predicate(1 + request_index)) response_ticket = links.Ticket( service_mate.tickets()[0].operation_id, request_index, None, None, None, None, 1, None, scenario.response_for_request(request), None, - None, None, None) + None, None, None, None) service_link.accept_ticket(response_ticket) invocation_mate.block_until_tickets_satisfy( test_cases.at_least_n_payloads_received_predicate(1 + request_index)) request_count = len(requests) invocation_completion_ticket = links.Ticket( test_operation_id, request_count + 1, None, None, None, None, None, - None, None, None, None, None, links.Ticket.Termination.COMPLETION) + None, None, None, None, None, links.Ticket.Termination.COMPLETION, + None) invocation_link.accept_ticket(invocation_completion_ticket) service_mate.block_until_tickets_satisfy(test_cases.terminated) service_completion_ticket = links.Ticket( service_mate.tickets()[0].operation_id, request_count, None, None, None, None, None, None, None, None, test_code, test_message, - links.Ticket.Termination.COMPLETION) + links.Ticket.Termination.COMPLETION, None) service_link.accept_ticket(service_completion_ticket) invocation_mate.block_until_tickets_satisfy(test_cases.terminated) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py index 1e575d1a9ef..ecf49d9cdb5 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py @@ -300,7 +300,7 @@ class TransmissionTest(object): invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD, links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata, invocation_payload, invocation_terminal_metadata, invocation_code, - invocation_message, links.Ticket.Termination.COMPLETION) + invocation_message, links.Ticket.Termination.COMPLETION, None) self._invocation_link.accept_ticket(original_invocation_ticket) self._service_mate.block_until_tickets_satisfy( @@ -317,7 +317,7 @@ class TransmissionTest(object): service_operation_id, 0, None, None, links.Ticket.Subscription.FULL, timeout, 0, service_initial_metadata, service_payload, service_terminal_metadata, service_code, service_message, - links.Ticket.Termination.COMPLETION) + links.Ticket.Termination.COMPLETION, None) self._service_link.accept_ticket(original_service_ticket) self._invocation_mate.block_until_tickets_satisfy(terminated) self._assert_is_valid_service_sequence( diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py index a2bd7107c17..39c7f2fc635 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py @@ -64,7 +64,7 @@ def _safe_for_log_ticket(ticket): ticket.allowance, ticket.initial_metadata, ''.format(payload_length), ticket.terminal_metadata, ticket.code, ticket.message, - ticket.termination) + ticket.termination, None) class RecordingLink(links.Link):