diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py index 336e9c21fd9..8e07d9061e0 100644 --- a/src/python/grpcio/grpc/framework/core/_end.py +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -168,7 +168,7 @@ class _End(End): def operate( self, group, method, subscription, timeout, initial_metadata=None, - payload=None, completion=None): + payload=None, completion=None, protocol_options=None): """See base.End.operate for specification.""" operation_id = uuid.uuid4() with self._lock: @@ -177,9 +177,9 @@ class _End(End): termination_action = _termination_action( self._lock, self._stats, operation_id, self._cycle) operation = _operation.invocation_operate( - operation_id, group, method, subscription, timeout, initial_metadata, - payload, completion, self._mate.accept_ticket, termination_action, - self._cycle.pool) + operation_id, group, method, subscription, timeout, protocol_options, + initial_metadata, payload, completion, self._mate.accept_ticket, + termination_action, self._cycle.pool) self._cycle.operations[operation_id] = operation return operation.context, operation.operator diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py index 2a534cb7e7b..7ac440722cb 100644 --- a/src/python/grpcio/grpc/framework/core/_interfaces.py +++ b/src/python/grpcio/grpc/framework/core/_interfaces.py @@ -111,8 +111,8 @@ class TransmissionManager(object): @abc.abstractmethod def kick_off( - self, group, method, timeout, initial_metadata, payload, completion, - allowance): + self, group, method, timeout, protocol_options, initial_metadata, + payload, completion, allowance): """Transmits the values associated with operation invocation.""" raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py index f5679d03569..d4eacc5a3fa 100644 --- a/src/python/grpcio/grpc/framework/core/_operation.py +++ b/src/python/grpcio/grpc/framework/core/_operation.py @@ -84,8 +84,9 @@ class _EasyOperation(_interfaces.Operation): def invocation_operate( - operation_id, group, method, subscription, timeout, initial_metadata, - payload, completion, ticket_sink, termination_action, pool): + operation_id, group, method, subscription, timeout, protocol_options, + initial_metadata, payload, completion, ticket_sink, termination_action, + pool): """Constructs objects necessary for front-side operation management. Args: @@ -95,6 +96,8 @@ def invocation_operate( subscription: A base.Subscription describing the customer's interest in the results of the operation. timeout: A length of time in seconds to allow for the operation. + protocol_options: A transport-specific, application-specific, and/or + protocol-specific value relating to the invocation. May be None. initial_metadata: An initial metadata value to be sent to the other side of the operation. May be None if the initial metadata will be passed later or if there will be no initial metadata passed at all. @@ -136,7 +139,8 @@ def invocation_operate( emission_manager.set_ingestion_manager(ingestion_manager) transmission_manager.kick_off( - group, method, timeout, initial_metadata, payload, completion, None) + group, method, timeout, protocol_options, initial_metadata, payload, + completion, None) return _EasyOperation( lock, termination_manager, transmission_manager, expiration_manager, diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py index 8f852cfe9f6..65b12c4160e 100644 --- a/src/python/grpcio/grpc/framework/core/_transmission.py +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -207,18 +207,19 @@ class TransmissionManager(_interfaces.TransmissionManager): self._transmitting = True def kick_off( - self, group, method, timeout, initial_metadata, payload, completion, - allowance): + self, group, method, timeout, protocol_options, initial_metadata, + payload, completion, allowance): """See _interfaces.TransmissionManager.kickoff for specification.""" # TODO(nathaniel): Support other subscriptions. subscription = links.Ticket.Subscription.FULL terminal_metadata, code, message, termination = _explode_completion( completion) self._remote_allowance = 1 if payload is None else 0 + protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options) ticket = links.Ticket( self._operation_id, 0, group, method, subscription, timeout, allowance, initial_metadata, payload, terminal_metadata, code, message, - termination, None) + termination, protocol) self._lowest_unused_sequence_number = 1 self._transmit(ticket) diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py index 4c6bf16f432..68db9fab8ef 100644 --- a/src/python/grpcio/grpc/framework/crust/_calls.py +++ b/src/python/grpcio/grpc/framework/crust/_calls.py @@ -38,12 +38,14 @@ _ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!' _EMPTY_COMPLETION = utilities.completion(None, None, None) -def _invoke(end, group, method, timeout, initial_metadata, payload, complete): +def _invoke( + end, group, method, timeout, protocol_options, initial_metadata, payload, + complete): rendezvous = _control.Rendezvous(None, None) operation_context, operator = end.operate( group, method, utilities.full_subscription(rendezvous), timeout, - initial_metadata=initial_metadata, payload=payload, - completion=_EMPTY_COMPLETION if complete else None) + protocol_options=protocol_options, initial_metadata=initial_metadata, + payload=payload, completion=_EMPTY_COMPLETION if complete else None) rendezvous.set_operator_and_context(operator, operation_context) outcome = operation_context.add_termination_callback(rendezvous.set_outcome) if outcome is not None: @@ -93,36 +95,43 @@ def _event_return_stream( def blocking_unary_unary( - end, group, method, timeout, with_call, initial_metadata, payload): + end, group, method, timeout, with_call, protocol_options, initial_metadata, + payload): """Services in a blocking fashion a unary-unary servicer method.""" rendezvous, unused_operation_context, unused_outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) if with_call: return next(rendezvous), rendezvous else: return next(rendezvous) -def future_unary_unary(end, group, method, timeout, initial_metadata, payload): +def future_unary_unary( + end, group, method, timeout, protocol_options, initial_metadata, payload): """Services a value-in value-out servicer method by returning a Future.""" rendezvous, unused_operation_context, unused_outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return rendezvous -def inline_unary_stream(end, group, method, timeout, initial_metadata, payload): +def inline_unary_stream( + end, group, method, timeout, protocol_options, initial_metadata, payload): """Services a value-in stream-out servicer method.""" rendezvous, unused_operation_context, unused_outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return rendezvous def blocking_stream_unary( - end, group, method, timeout, with_call, initial_metadata, payload_iterator, - pool): + end, group, method, timeout, with_call, protocol_options, initial_metadata, + payload_iterator, pool): """Services in a blocking fashion a stream-in value-out servicer method.""" rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) if outcome is None: def in_pool(): for payload in payload_iterator: @@ -141,10 +150,12 @@ def blocking_stream_unary( def future_stream_unary( - end, group, method, timeout, initial_metadata, payload_iterator, pool): + end, group, method, timeout, protocol_options, initial_metadata, + payload_iterator, pool): """Services a stream-in value-out servicer method by returning a Future.""" rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) if outcome is None: def in_pool(): for payload in payload_iterator: @@ -155,10 +166,12 @@ def future_stream_unary( def inline_stream_stream( - end, group, method, timeout, initial_metadata, payload_iterator, pool): + end, group, method, timeout, protocol_options, initial_metadata, + payload_iterator, pool): """Services a stream-in stream-out servicer method.""" rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) if outcome is None: def in_pool(): for payload in payload_iterator: @@ -169,36 +182,40 @@ def inline_stream_stream( def event_unary_unary( - end, group, method, timeout, initial_metadata, payload, receiver, - abortion_callback, pool): + end, group, method, timeout, protocol_options, initial_metadata, payload, + receiver, abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return _event_return_unary( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) def event_unary_stream( - end, group, method, timeout, initial_metadata, payload, + end, group, method, timeout, protocol_options, initial_metadata, payload, receiver, abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return _event_return_stream( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) def event_stream_unary( - end, group, method, timeout, initial_metadata, receiver, abortion_callback, - pool): + end, group, method, timeout, protocol_options, initial_metadata, receiver, + abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) return _event_return_unary( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) def event_stream_stream( - end, group, method, timeout, initial_metadata, receiver, abortion_callback, - pool): + end, group, method, timeout, protocol_options, initial_metadata, receiver, + abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) return _event_return_stream( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py index 7bddf46a572..e02a41d7209 100644 --- a/src/python/grpcio/grpc/framework/crust/_control.py +++ b/src/python/grpcio/grpc/framework/crust/_control.py @@ -442,6 +442,10 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call): else: return self._termination.abortion + def protocol_context(self): + with self._condition: + raise NotImplementedError('TODO: protocol context implementation!') + def initial_metadata(self): with self._condition: while True: diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py index 6ff7249e75c..f1855c2f471 100644 --- a/src/python/grpcio/grpc/framework/crust/_service.py +++ b/src/python/grpcio/grpc/framework/crust/_service.py @@ -52,6 +52,9 @@ class _ServicerContext(face.ServicerContext): def cancel(self): self._rendezvous.cancel() + def protocol_context(self): + return self._rendezvous.protocol_context() + def invocation_metadata(self): return self._rendezvous.initial_metadata() diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py index d38fab8ba01..4ebc4e9ae85 100644 --- a/src/python/grpcio/grpc/framework/crust/implementations.py +++ b/src/python/grpcio/grpc/framework/crust/implementations.py @@ -66,22 +66,23 @@ class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable): self._pool = pool def __call__( - self, request, timeout, metadata=None, with_call=False): + self, request, timeout, metadata=None, with_call=False, + protocol_options=None): return _calls.blocking_unary_unary( self._end, self._group, self._method, timeout, with_call, - metadata, request) + protocol_options, metadata, request) - def future(self, request, timeout, metadata=None): + def future(self, request, timeout, metadata=None, protocol_options=None): return _calls.future_unary_unary( - self._end, self._group, self._method, timeout, metadata, - request) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request) def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_unary( - self._end, self._group, self._method, timeout, metadata, - request, receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request, receiver, abortion_callback, self._pool) class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): @@ -92,17 +93,17 @@ class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): self._method = method self._pool = pool - def __call__(self, request, timeout, metadata=None): + def __call__(self, request, timeout, metadata=None, protocol_options=None): return _calls.inline_unary_stream( - self._end, self._group, self._method, timeout, metadata, - request) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request) def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_stream( - self._end, self._group, self._method, timeout, metadata, - request, receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request, receiver, abortion_callback, self._pool) class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): @@ -115,21 +116,23 @@ class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): def __call__( self, request_iterator, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): return _calls.blocking_stream_unary( self._end, self._group, self._method, timeout, with_call, - metadata, request_iterator, self._pool) + protocol_options, metadata, request_iterator, self._pool) - def future(self, request_iterator, timeout, metadata=None): + def future( + self, request_iterator, timeout, metadata=None, protocol_options=None): return _calls.future_stream_unary( - self._end, self._group, self._method, timeout, metadata, - request_iterator, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request_iterator, self._pool) def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): return _calls.event_stream_unary( - self._end, self._group, self._method, timeout, metadata, - receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, receiver, abortion_callback, self._pool) class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): @@ -140,16 +143,18 @@ class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): self._method = method self._pool = pool - def __call__(self, request_iterator, timeout, metadata=None): + def __call__( + self, request_iterator, timeout, metadata=None, protocol_options=None): return _calls.inline_stream_stream( - self._end, self._group, self._method, timeout, metadata, - request_iterator, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request_iterator, self._pool) def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): return _calls.event_stream_stream( - self._end, self._group, self._method, timeout, metadata, - receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, receiver, abortion_callback, self._pool) class _GenericStub(face.GenericStub): @@ -161,66 +166,70 @@ class _GenericStub(face.GenericStub): def blocking_unary_unary( self, group, method, request, timeout, metadata=None, - with_call=None): + with_call=None, protocol_options=None): return _calls.blocking_unary_unary( - self._end, group, method, timeout, with_call, metadata, - request) + self._end, group, method, timeout, with_call, protocol_options, + metadata, request) def future_unary_unary( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): return _calls.future_unary_unary( - self._end, group, method, timeout, metadata, request) + self._end, group, method, timeout, protocol_options, metadata, request) def inline_unary_stream( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): return _calls.inline_unary_stream( - self._end, group, method, timeout, metadata, request) + self._end, group, method, timeout, protocol_options, metadata, request) def blocking_stream_unary( self, group, method, request_iterator, timeout, metadata=None, - with_call=None): + with_call=None, protocol_options=None): return _calls.blocking_stream_unary( - self._end, group, method, timeout, with_call, metadata, - request_iterator, self._pool) + self._end, group, method, timeout, with_call, protocol_options, + metadata, request_iterator, self._pool) def future_stream_unary( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): return _calls.future_stream_unary( - self._end, group, method, timeout, metadata, + self._end, group, method, timeout, protocol_options, metadata, request_iterator, self._pool) def inline_stream_stream( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): return _calls.inline_stream_stream( - self._end, group, method, timeout, metadata, + self._end, group, method, timeout, protocol_options, metadata, request_iterator, self._pool) def event_unary_unary( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_unary( - self._end, group, method, timeout, metadata, request, + self._end, group, method, timeout, protocol_options, metadata, request, receiver, abortion_callback, self._pool) def event_unary_stream( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_stream( - self._end, group, method, timeout, metadata, request, + self._end, group, method, timeout, protocol_options, metadata, request, receiver, abortion_callback, self._pool) def event_stream_unary( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_stream_unary( - self._end, group, method, timeout, metadata, receiver, + self._end, group, method, timeout, protocol_options, metadata, receiver, abortion_callback, self._pool) def event_stream_stream( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_stream_stream( - self._end, group, method, timeout, metadata, receiver, + self._end, group, method, timeout, protocol_options, metadata, receiver, abortion_callback, self._pool) def unary_unary(self, group, method): diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py index 0d9d6b464e2..013e7c66f22 100644 --- a/src/python/grpcio/grpc/framework/interfaces/base/base.py +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -274,7 +274,7 @@ class End(object): @abc.abstractmethod def operate( self, group, method, subscription, timeout, initial_metadata=None, - payload=None, completion=None): + payload=None, completion=None, protocol_options=None): """Commences an operation. Args: @@ -290,6 +290,8 @@ class End(object): payload: An initial payload for the operation. completion: A Completion value indicating the end of transmission to the other side of the operation. + protocol_options: A value specified by the provider of a Base interface + implementation affording custom state and behavior. Returns: A pair of objects affording information about the operation and action diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py index 948e7505b6d..bc9a434a760 100644 --- a/src/python/grpcio/grpc/framework/interfaces/face/face.py +++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py @@ -184,6 +184,16 @@ class RpcContext(object): """ raise NotImplementedError() + @abc.abstractmethod + def protocol_context(self): + """Accesses a custom object specified by an implementation provider. + + Returns: + A value specified by the provider of a Face interface implementation + affording custom state and behavior. + """ + raise NotImplementedError() + class Call(RpcContext): """Invocation-side utility object for an RPC.""" @@ -354,7 +364,8 @@ class UnaryUnaryMultiCallable(object): @abc.abstractmethod def __call__( - self, request, timeout, metadata=None, with_call=False): + self, request, timeout, metadata=None, with_call=False, + protocol_options=None): """Synchronously invokes the underlying RPC. Args: @@ -364,6 +375,8 @@ class UnaryUnaryMultiCallable(object): the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -375,7 +388,7 @@ class UnaryUnaryMultiCallable(object): raise NotImplementedError() @abc.abstractmethod - def future(self, request, timeout, metadata=None): + def future(self, request, timeout, metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -383,6 +396,8 @@ class UnaryUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -395,7 +410,7 @@ class UnaryUnaryMultiCallable(object): @abc.abstractmethod def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -406,6 +421,8 @@ class UnaryUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call for the RPC. @@ -418,7 +435,7 @@ class UnaryStreamMultiCallable(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __call__(self, request, timeout, metadata=None): + def __call__(self, request, timeout, metadata=None, protocol_options=None): """Invokes the underlying RPC. Args: @@ -426,6 +443,8 @@ class UnaryStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -437,7 +456,7 @@ class UnaryStreamMultiCallable(object): @abc.abstractmethod def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -448,6 +467,8 @@ class UnaryStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call object for the RPC. @@ -462,7 +483,7 @@ class StreamUnaryMultiCallable(object): @abc.abstractmethod def __call__( self, request_iterator, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): """Synchronously invokes the underlying RPC. Args: @@ -472,6 +493,8 @@ class StreamUnaryMultiCallable(object): the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -483,7 +506,8 @@ class StreamUnaryMultiCallable(object): raise NotImplementedError() @abc.abstractmethod - def future(self, request_iterator, timeout, metadata=None): + def future( + self, request_iterator, timeout, metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -491,6 +515,8 @@ class StreamUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -502,7 +528,8 @@ class StreamUnaryMultiCallable(object): @abc.abstractmethod def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -512,6 +539,8 @@ class StreamUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A single object that is both a Call object for the RPC and a @@ -525,7 +554,8 @@ class StreamStreamMultiCallable(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __call__(self, request_iterator, timeout, metadata=None): + def __call__( + self, request_iterator, timeout, metadata=None, protocol_options=None): """Invokes the underlying RPC. Args: @@ -533,6 +563,8 @@ class StreamStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -543,7 +575,8 @@ class StreamStreamMultiCallable(object): @abc.abstractmethod def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -553,6 +586,8 @@ class StreamStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A single object that is both a Call object for the RPC and a @@ -646,7 +681,7 @@ class GenericStub(object): @abc.abstractmethod def blocking_unary_unary( self, group, method, request, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): """Invokes a unary-request-unary-response method. This method blocks until either returning the response value of the RPC @@ -661,6 +696,8 @@ class GenericStub(object): metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -673,7 +710,8 @@ class GenericStub(object): @abc.abstractmethod def future_unary_unary( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): """Invokes a unary-request-unary-response method. Args: @@ -682,6 +720,8 @@ class GenericStub(object): request: The request value for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -693,7 +733,8 @@ class GenericStub(object): @abc.abstractmethod def inline_unary_stream( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): """Invokes a unary-request-stream-response method. Args: @@ -702,6 +743,8 @@ class GenericStub(object): request: The request value for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -713,7 +756,7 @@ class GenericStub(object): @abc.abstractmethod def blocking_stream_unary( self, group, method, request_iterator, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): """Invokes a stream-request-unary-response method. This method blocks until either returning the response value of the RPC @@ -728,6 +771,8 @@ class GenericStub(object): metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -740,7 +785,8 @@ class GenericStub(object): @abc.abstractmethod def future_stream_unary( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): """Invokes a stream-request-unary-response method. Args: @@ -749,6 +795,8 @@ class GenericStub(object): request_iterator: An iterator that yields request values for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -760,7 +808,8 @@ class GenericStub(object): @abc.abstractmethod def inline_stream_stream( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): """Invokes a stream-request-stream-response method. Args: @@ -769,6 +818,8 @@ class GenericStub(object): request_iterator: An iterator that yields request values for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -780,7 +831,7 @@ class GenericStub(object): @abc.abstractmethod def event_unary_unary( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-unary-response method. Args: @@ -792,6 +843,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call for the RPC. @@ -801,7 +854,7 @@ class GenericStub(object): @abc.abstractmethod def event_unary_stream( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-stream-response method. Args: @@ -813,6 +866,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call for the RPC. @@ -822,7 +877,7 @@ class GenericStub(object): @abc.abstractmethod def event_stream_unary( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-unary-response method. Args: @@ -833,6 +888,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A pair of a Call object for the RPC and a stream.Consumer to which the @@ -843,7 +900,7 @@ class GenericStub(object): @abc.abstractmethod def event_stream_stream( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-stream-response method. Args: @@ -854,6 +911,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A pair of a Call object for the RPC and a stream.Consumer to which the diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py index b98a30a3990..24f0e3b3545 100644 --- a/src/python/grpcio/grpc/framework/interfaces/links/links.py +++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py @@ -34,14 +34,13 @@ import collections import enum -class Transport(collections.namedtuple('Transport', ('kind', 'value',))): - """A sum type for handles to an underlying transport system. +class Protocol(collections.namedtuple('Protocol', ('kind', 'value',))): + """A sum type for handles to a system that transmits tickets. Attributes: - kind: A Kind value identifying the kind of value being passed to or from - the underlying transport. - value: The value being passed through RPC Framework between the high-level - application and the underlying transport. + kind: A Kind value identifying the kind of value being passed. + value: The value being passed between the high-level application and the + system affording ticket transport. """ @enum.unique @@ -56,8 +55,7 @@ class Ticket( 'Ticket', ('operation_id', 'sequence_number', 'group', 'method', 'subscription', 'timeout', 'allowance', 'initial_metadata', 'payload', - 'terminal_metadata', 'code', 'message', 'termination', - 'transport',))): + 'terminal_metadata', 'code', 'message', 'termination', 'protocol',))): """A sum type for all values sent from a front to a back. Attributes: @@ -99,8 +97,8 @@ class Ticket( termination: A Termination value describing the end of the operation, or None if the operation has not yet terminated. If set, no further tickets may be sent in the same direction. - transport: A Transport value or None, with further semantics being a matter - between high-level application and underlying transport. + protocol: A Protocol value or None, with further semantics being a matter + between high-level application and underlying ticket transport. """ @enum.unique