Merge pull request #3231 from nathanielmanistaatgoogle/protocol-objects

Initial work on protocol objects
pull/3246/head
Masood Malekghassemi 9 years ago
commit 4e3a113909
  1. 8
      src/python/grpcio/grpc/framework/core/_end.py
  2. 4
      src/python/grpcio/grpc/framework/core/_interfaces.py
  3. 10
      src/python/grpcio/grpc/framework/core/_operation.py
  4. 7
      src/python/grpcio/grpc/framework/core/_transmission.py
  5. 71
      src/python/grpcio/grpc/framework/crust/_calls.py
  6. 4
      src/python/grpcio/grpc/framework/crust/_control.py
  7. 3
      src/python/grpcio/grpc/framework/crust/_service.py
  8. 109
      src/python/grpcio/grpc/framework/crust/implementations.py
  9. 4
      src/python/grpcio/grpc/framework/interfaces/base/base.py
  10. 99
      src/python/grpcio/grpc/framework/interfaces/face/face.py
  11. 18
      src/python/grpcio/grpc/framework/interfaces/links/links.py

@ -168,7 +168,7 @@ class _End(End):
def operate( def operate(
self, group, method, subscription, timeout, initial_metadata=None, self, group, method, subscription, timeout, initial_metadata=None,
payload=None, completion=None): payload=None, completion=None, protocol_options=None):
"""See base.End.operate for specification.""" """See base.End.operate for specification."""
operation_id = uuid.uuid4() operation_id = uuid.uuid4()
with self._lock: with self._lock:
@ -177,9 +177,9 @@ class _End(End):
termination_action = _termination_action( termination_action = _termination_action(
self._lock, self._stats, operation_id, self._cycle) self._lock, self._stats, operation_id, self._cycle)
operation = _operation.invocation_operate( operation = _operation.invocation_operate(
operation_id, group, method, subscription, timeout, initial_metadata, operation_id, group, method, subscription, timeout, protocol_options,
payload, completion, self._mate.accept_ticket, termination_action, initial_metadata, payload, completion, self._mate.accept_ticket,
self._cycle.pool) termination_action, self._cycle.pool)
self._cycle.operations[operation_id] = operation self._cycle.operations[operation_id] = operation
return operation.context, operation.operator return operation.context, operation.operator

@ -111,8 +111,8 @@ class TransmissionManager(object):
@abc.abstractmethod @abc.abstractmethod
def kick_off( def kick_off(
self, group, method, timeout, initial_metadata, payload, completion, self, group, method, timeout, protocol_options, initial_metadata,
allowance): payload, completion, allowance):
"""Transmits the values associated with operation invocation.""" """Transmits the values associated with operation invocation."""
raise NotImplementedError() raise NotImplementedError()

@ -84,8 +84,9 @@ class _EasyOperation(_interfaces.Operation):
def invocation_operate( def invocation_operate(
operation_id, group, method, subscription, timeout, initial_metadata, operation_id, group, method, subscription, timeout, protocol_options,
payload, completion, ticket_sink, termination_action, pool): initial_metadata, payload, completion, ticket_sink, termination_action,
pool):
"""Constructs objects necessary for front-side operation management. """Constructs objects necessary for front-side operation management.
Args: Args:
@ -95,6 +96,8 @@ def invocation_operate(
subscription: A base.Subscription describing the customer's interest in the subscription: A base.Subscription describing the customer's interest in the
results of the operation. results of the operation.
timeout: A length of time in seconds to allow for the operation. timeout: A length of time in seconds to allow for the operation.
protocol_options: A transport-specific, application-specific, and/or
protocol-specific value relating to the invocation. May be None.
initial_metadata: An initial metadata value to be sent to the other side of initial_metadata: An initial metadata value to be sent to the other side of
the operation. May be None if the initial metadata will be passed later or the operation. May be None if the initial metadata will be passed later or
if there will be no initial metadata passed at all. if there will be no initial metadata passed at all.
@ -136,7 +139,8 @@ def invocation_operate(
emission_manager.set_ingestion_manager(ingestion_manager) emission_manager.set_ingestion_manager(ingestion_manager)
transmission_manager.kick_off( transmission_manager.kick_off(
group, method, timeout, initial_metadata, payload, completion, None) group, method, timeout, protocol_options, initial_metadata, payload,
completion, None)
return _EasyOperation( return _EasyOperation(
lock, termination_manager, transmission_manager, expiration_manager, lock, termination_manager, transmission_manager, expiration_manager,

@ -207,18 +207,19 @@ class TransmissionManager(_interfaces.TransmissionManager):
self._transmitting = True self._transmitting = True
def kick_off( def kick_off(
self, group, method, timeout, initial_metadata, payload, completion, self, group, method, timeout, protocol_options, initial_metadata,
allowance): payload, completion, allowance):
"""See _interfaces.TransmissionManager.kickoff for specification.""" """See _interfaces.TransmissionManager.kickoff for specification."""
# TODO(nathaniel): Support other subscriptions. # TODO(nathaniel): Support other subscriptions.
subscription = links.Ticket.Subscription.FULL subscription = links.Ticket.Subscription.FULL
terminal_metadata, code, message, termination = _explode_completion( terminal_metadata, code, message, termination = _explode_completion(
completion) completion)
self._remote_allowance = 1 if payload is None else 0 self._remote_allowance = 1 if payload is None else 0
protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options)
ticket = links.Ticket( ticket = links.Ticket(
self._operation_id, 0, group, method, subscription, timeout, allowance, self._operation_id, 0, group, method, subscription, timeout, allowance,
initial_metadata, payload, terminal_metadata, code, message, initial_metadata, payload, terminal_metadata, code, message,
termination, None) termination, protocol)
self._lowest_unused_sequence_number = 1 self._lowest_unused_sequence_number = 1
self._transmit(ticket) self._transmit(ticket)

@ -38,12 +38,14 @@ _ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
_EMPTY_COMPLETION = utilities.completion(None, None, None) _EMPTY_COMPLETION = utilities.completion(None, None, None)
def _invoke(end, group, method, timeout, initial_metadata, payload, complete): def _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
complete):
rendezvous = _control.Rendezvous(None, None) rendezvous = _control.Rendezvous(None, None)
operation_context, operator = end.operate( operation_context, operator = end.operate(
group, method, utilities.full_subscription(rendezvous), timeout, group, method, utilities.full_subscription(rendezvous), timeout,
initial_metadata=initial_metadata, payload=payload, protocol_options=protocol_options, initial_metadata=initial_metadata,
completion=_EMPTY_COMPLETION if complete else None) payload=payload, completion=_EMPTY_COMPLETION if complete else None)
rendezvous.set_operator_and_context(operator, operation_context) rendezvous.set_operator_and_context(operator, operation_context)
outcome = operation_context.add_termination_callback(rendezvous.set_outcome) outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is not None: if outcome is not None:
@ -93,36 +95,43 @@ def _event_return_stream(
def blocking_unary_unary( def blocking_unary_unary(
end, group, method, timeout, with_call, initial_metadata, payload): end, group, method, timeout, with_call, protocol_options, initial_metadata,
payload):
"""Services in a blocking fashion a unary-unary servicer method.""" """Services in a blocking fashion a unary-unary servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke( rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True) end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
if with_call: if with_call:
return next(rendezvous), rendezvous return next(rendezvous), rendezvous
else: else:
return next(rendezvous) return next(rendezvous)
def future_unary_unary(end, group, method, timeout, initial_metadata, payload): def future_unary_unary(
end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in value-out servicer method by returning a Future.""" """Services a value-in value-out servicer method by returning a Future."""
rendezvous, unused_operation_context, unused_outcome = _invoke( rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True) end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return rendezvous return rendezvous
def inline_unary_stream(end, group, method, timeout, initial_metadata, payload): def inline_unary_stream(
end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in stream-out servicer method.""" """Services a value-in stream-out servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke( rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True) end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return rendezvous return rendezvous
def blocking_stream_unary( def blocking_stream_unary(
end, group, method, timeout, with_call, initial_metadata, payload_iterator, end, group, method, timeout, with_call, protocol_options, initial_metadata,
pool): payload_iterator, pool):
"""Services in a blocking fashion a stream-in value-out servicer method.""" """Services in a blocking fashion a stream-in value-out servicer method."""
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False) end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None: if outcome is None:
def in_pool(): def in_pool():
for payload in payload_iterator: for payload in payload_iterator:
@ -141,10 +150,12 @@ def blocking_stream_unary(
def future_stream_unary( def future_stream_unary(
end, group, method, timeout, initial_metadata, payload_iterator, pool): end, group, method, timeout, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services a stream-in value-out servicer method by returning a Future.""" """Services a stream-in value-out servicer method by returning a Future."""
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False) end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None: if outcome is None:
def in_pool(): def in_pool():
for payload in payload_iterator: for payload in payload_iterator:
@ -155,10 +166,12 @@ def future_stream_unary(
def inline_stream_stream( def inline_stream_stream(
end, group, method, timeout, initial_metadata, payload_iterator, pool): end, group, method, timeout, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services a stream-in stream-out servicer method.""" """Services a stream-in stream-out servicer method."""
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False) end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None: if outcome is None:
def in_pool(): def in_pool():
for payload in payload_iterator: for payload in payload_iterator:
@ -169,36 +182,40 @@ def inline_stream_stream(
def event_unary_unary( def event_unary_unary(
end, group, method, timeout, initial_metadata, payload, receiver, end, group, method, timeout, protocol_options, initial_metadata, payload,
abortion_callback, pool): receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True) end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return _event_return_unary( return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_unary_stream( def event_unary_stream(
end, group, method, timeout, initial_metadata, payload, end, group, method, timeout, protocol_options, initial_metadata, payload,
receiver, abortion_callback, pool): receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True) end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return _event_return_stream( return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_unary( def event_stream_unary(
end, group, method, timeout, initial_metadata, receiver, abortion_callback, end, group, method, timeout, protocol_options, initial_metadata, receiver,
pool): abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False) end, group, method, timeout, protocol_options, initial_metadata, None,
False)
return _event_return_unary( return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_stream( def event_stream_stream(
end, group, method, timeout, initial_metadata, receiver, abortion_callback, end, group, method, timeout, protocol_options, initial_metadata, receiver,
pool): abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke( rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False) end, group, method, timeout, protocol_options, initial_metadata, None,
False)
return _event_return_stream( return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) receiver, abortion_callback, rendezvous, operation_context, outcome, pool)

@ -442,6 +442,10 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
else: else:
return self._termination.abortion return self._termination.abortion
def protocol_context(self):
with self._condition:
raise NotImplementedError('TODO: protocol context implementation!')
def initial_metadata(self): def initial_metadata(self):
with self._condition: with self._condition:
while True: while True:

@ -52,6 +52,9 @@ class _ServicerContext(face.ServicerContext):
def cancel(self): def cancel(self):
self._rendezvous.cancel() self._rendezvous.cancel()
def protocol_context(self):
return self._rendezvous.protocol_context()
def invocation_metadata(self): def invocation_metadata(self):
return self._rendezvous.initial_metadata() return self._rendezvous.initial_metadata()

@ -66,22 +66,23 @@ class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
self._pool = pool self._pool = pool
def __call__( def __call__(
self, request, timeout, metadata=None, with_call=False): self, request, timeout, metadata=None, with_call=False,
protocol_options=None):
return _calls.blocking_unary_unary( return _calls.blocking_unary_unary(
self._end, self._group, self._method, timeout, with_call, self._end, self._group, self._method, timeout, with_call,
metadata, request) protocol_options, metadata, request)
def future(self, request, timeout, metadata=None): def future(self, request, timeout, metadata=None, protocol_options=None):
return _calls.future_unary_unary( return _calls.future_unary_unary(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
request) metadata, request)
def event( def event(
self, request, receiver, abortion_callback, timeout, self, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
return _calls.event_unary_unary( return _calls.event_unary_unary(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
request, receiver, abortion_callback, self._pool) metadata, request, receiver, abortion_callback, self._pool)
class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
@ -92,17 +93,17 @@ class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
self._method = method self._method = method
self._pool = pool self._pool = pool
def __call__(self, request, timeout, metadata=None): def __call__(self, request, timeout, metadata=None, protocol_options=None):
return _calls.inline_unary_stream( return _calls.inline_unary_stream(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
request) metadata, request)
def event( def event(
self, request, receiver, abortion_callback, timeout, self, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
return _calls.event_unary_stream( return _calls.event_unary_stream(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
request, receiver, abortion_callback, self._pool) metadata, request, receiver, abortion_callback, self._pool)
class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
@ -115,21 +116,23 @@ class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
def __call__( def __call__(
self, request_iterator, timeout, metadata=None, self, request_iterator, timeout, metadata=None,
with_call=False): with_call=False, protocol_options=None):
return _calls.blocking_stream_unary( return _calls.blocking_stream_unary(
self._end, self._group, self._method, timeout, with_call, self._end, self._group, self._method, timeout, with_call,
metadata, request_iterator, self._pool) protocol_options, metadata, request_iterator, self._pool)
def future(self, request_iterator, timeout, metadata=None): def future(
self, request_iterator, timeout, metadata=None, protocol_options=None):
return _calls.future_stream_unary( return _calls.future_stream_unary(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
request_iterator, self._pool) metadata, request_iterator, self._pool)
def event( def event(
self, receiver, abortion_callback, timeout, metadata=None): self, receiver, abortion_callback, timeout, metadata=None,
protocol_options=None):
return _calls.event_stream_unary( return _calls.event_stream_unary(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
receiver, abortion_callback, self._pool) metadata, receiver, abortion_callback, self._pool)
class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
@ -140,16 +143,18 @@ class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
self._method = method self._method = method
self._pool = pool self._pool = pool
def __call__(self, request_iterator, timeout, metadata=None): def __call__(
self, request_iterator, timeout, metadata=None, protocol_options=None):
return _calls.inline_stream_stream( return _calls.inline_stream_stream(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
request_iterator, self._pool) metadata, request_iterator, self._pool)
def event( def event(
self, receiver, abortion_callback, timeout, metadata=None): self, receiver, abortion_callback, timeout, metadata=None,
protocol_options=None):
return _calls.event_stream_stream( return _calls.event_stream_stream(
self._end, self._group, self._method, timeout, metadata, self._end, self._group, self._method, timeout, protocol_options,
receiver, abortion_callback, self._pool) metadata, receiver, abortion_callback, self._pool)
class _GenericStub(face.GenericStub): class _GenericStub(face.GenericStub):
@ -161,66 +166,70 @@ class _GenericStub(face.GenericStub):
def blocking_unary_unary( def blocking_unary_unary(
self, group, method, request, timeout, metadata=None, self, group, method, request, timeout, metadata=None,
with_call=None): with_call=None, protocol_options=None):
return _calls.blocking_unary_unary( return _calls.blocking_unary_unary(
self._end, group, method, timeout, with_call, metadata, self._end, group, method, timeout, with_call, protocol_options,
request) metadata, request)
def future_unary_unary( def future_unary_unary(
self, group, method, request, timeout, metadata=None): self, group, method, request, timeout, metadata=None,
protocol_options=None):
return _calls.future_unary_unary( return _calls.future_unary_unary(
self._end, group, method, timeout, metadata, request) self._end, group, method, timeout, protocol_options, metadata, request)
def inline_unary_stream( def inline_unary_stream(
self, group, method, request, timeout, metadata=None): self, group, method, request, timeout, metadata=None,
protocol_options=None):
return _calls.inline_unary_stream( return _calls.inline_unary_stream(
self._end, group, method, timeout, metadata, request) self._end, group, method, timeout, protocol_options, metadata, request)
def blocking_stream_unary( def blocking_stream_unary(
self, group, method, request_iterator, timeout, metadata=None, self, group, method, request_iterator, timeout, metadata=None,
with_call=None): with_call=None, protocol_options=None):
return _calls.blocking_stream_unary( return _calls.blocking_stream_unary(
self._end, group, method, timeout, with_call, metadata, self._end, group, method, timeout, with_call, protocol_options,
request_iterator, self._pool) metadata, request_iterator, self._pool)
def future_stream_unary( def future_stream_unary(
self, group, method, request_iterator, timeout, metadata=None): self, group, method, request_iterator, timeout, metadata=None,
protocol_options=None):
return _calls.future_stream_unary( return _calls.future_stream_unary(
self._end, group, method, timeout, metadata, self._end, group, method, timeout, protocol_options, metadata,
request_iterator, self._pool) request_iterator, self._pool)
def inline_stream_stream( def inline_stream_stream(
self, group, method, request_iterator, timeout, metadata=None): self, group, method, request_iterator, timeout, metadata=None,
protocol_options=None):
return _calls.inline_stream_stream( return _calls.inline_stream_stream(
self._end, group, method, timeout, metadata, self._end, group, method, timeout, protocol_options, metadata,
request_iterator, self._pool) request_iterator, self._pool)
def event_unary_unary( def event_unary_unary(
self, group, method, request, receiver, abortion_callback, timeout, self, group, method, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
return _calls.event_unary_unary( return _calls.event_unary_unary(
self._end, group, method, timeout, metadata, request, self._end, group, method, timeout, protocol_options, metadata, request,
receiver, abortion_callback, self._pool) receiver, abortion_callback, self._pool)
def event_unary_stream( def event_unary_stream(
self, group, method, request, receiver, abortion_callback, timeout, self, group, method, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
return _calls.event_unary_stream( return _calls.event_unary_stream(
self._end, group, method, timeout, metadata, request, self._end, group, method, timeout, protocol_options, metadata, request,
receiver, abortion_callback, self._pool) receiver, abortion_callback, self._pool)
def event_stream_unary( def event_stream_unary(
self, group, method, receiver, abortion_callback, timeout, self, group, method, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
return _calls.event_stream_unary( return _calls.event_stream_unary(
self._end, group, method, timeout, metadata, receiver, self._end, group, method, timeout, protocol_options, metadata, receiver,
abortion_callback, self._pool) abortion_callback, self._pool)
def event_stream_stream( def event_stream_stream(
self, group, method, receiver, abortion_callback, timeout, self, group, method, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
return _calls.event_stream_stream( return _calls.event_stream_stream(
self._end, group, method, timeout, metadata, receiver, self._end, group, method, timeout, protocol_options, metadata, receiver,
abortion_callback, self._pool) abortion_callback, self._pool)
def unary_unary(self, group, method): def unary_unary(self, group, method):

@ -274,7 +274,7 @@ class End(object):
@abc.abstractmethod @abc.abstractmethod
def operate( def operate(
self, group, method, subscription, timeout, initial_metadata=None, self, group, method, subscription, timeout, initial_metadata=None,
payload=None, completion=None): payload=None, completion=None, protocol_options=None):
"""Commences an operation. """Commences an operation.
Args: Args:
@ -290,6 +290,8 @@ class End(object):
payload: An initial payload for the operation. payload: An initial payload for the operation.
completion: A Completion value indicating the end of transmission to the completion: A Completion value indicating the end of transmission to the
other side of the operation. other side of the operation.
protocol_options: A value specified by the provider of a Base interface
implementation affording custom state and behavior.
Returns: Returns:
A pair of objects affording information about the operation and action A pair of objects affording information about the operation and action

@ -184,6 +184,16 @@ class RpcContext(object):
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
def protocol_context(self):
"""Accesses a custom object specified by an implementation provider.
Returns:
A value specified by the provider of a Face interface implementation
affording custom state and behavior.
"""
raise NotImplementedError()
class Call(RpcContext): class Call(RpcContext):
"""Invocation-side utility object for an RPC.""" """Invocation-side utility object for an RPC."""
@ -354,7 +364,8 @@ class UnaryUnaryMultiCallable(object):
@abc.abstractmethod @abc.abstractmethod
def __call__( def __call__(
self, request, timeout, metadata=None, with_call=False): self, request, timeout, metadata=None, with_call=False,
protocol_options=None):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
@ -364,6 +375,8 @@ class UnaryUnaryMultiCallable(object):
the RPC. the RPC.
with_call: Whether or not to include return a Call for the RPC in addition with_call: Whether or not to include return a Call for the RPC in addition
to the reponse. to the reponse.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
The response value for the RPC, and a Call for the RPC if with_call was The response value for the RPC, and a Call for the RPC if with_call was
@ -375,7 +388,7 @@ class UnaryUnaryMultiCallable(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def future(self, request, timeout, metadata=None): def future(self, request, timeout, metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -383,6 +396,8 @@ class UnaryUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and a future.Future. In the An object that is both a Call for the RPC and a future.Future. In the
@ -395,7 +410,7 @@ class UnaryUnaryMultiCallable(object):
@abc.abstractmethod @abc.abstractmethod
def event( def event(
self, request, receiver, abortion_callback, timeout, self, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -406,6 +421,8 @@ class UnaryUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A Call for the RPC. A Call for the RPC.
@ -418,7 +435,7 @@ class UnaryStreamMultiCallable(object):
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def __call__(self, request, timeout, metadata=None): def __call__(self, request, timeout, metadata=None, protocol_options=None):
"""Invokes the underlying RPC. """Invokes the underlying RPC.
Args: Args:
@ -426,6 +443,8 @@ class UnaryStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and an iterator of response An object that is both a Call for the RPC and an iterator of response
@ -437,7 +456,7 @@ class UnaryStreamMultiCallable(object):
@abc.abstractmethod @abc.abstractmethod
def event( def event(
self, request, receiver, abortion_callback, timeout, self, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -448,6 +467,8 @@ class UnaryStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A Call object for the RPC. A Call object for the RPC.
@ -462,7 +483,7 @@ class StreamUnaryMultiCallable(object):
@abc.abstractmethod @abc.abstractmethod
def __call__( def __call__(
self, request_iterator, timeout, metadata=None, self, request_iterator, timeout, metadata=None,
with_call=False): with_call=False, protocol_options=None):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
@ -472,6 +493,8 @@ class StreamUnaryMultiCallable(object):
the RPC. the RPC.
with_call: Whether or not to include return a Call for the RPC in addition with_call: Whether or not to include return a Call for the RPC in addition
to the reponse. to the reponse.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
The response value for the RPC, and a Call for the RPC if with_call was The response value for the RPC, and a Call for the RPC if with_call was
@ -483,7 +506,8 @@ class StreamUnaryMultiCallable(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def future(self, request_iterator, timeout, metadata=None): def future(
self, request_iterator, timeout, metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -491,6 +515,8 @@ class StreamUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and a future.Future. In the An object that is both a Call for the RPC and a future.Future. In the
@ -502,7 +528,8 @@ class StreamUnaryMultiCallable(object):
@abc.abstractmethod @abc.abstractmethod
def event( def event(
self, receiver, abortion_callback, timeout, metadata=None): self, receiver, abortion_callback, timeout, metadata=None,
protocol_options=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -512,6 +539,8 @@ class StreamUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A single object that is both a Call object for the RPC and a A single object that is both a Call object for the RPC and a
@ -525,7 +554,8 @@ class StreamStreamMultiCallable(object):
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def __call__(self, request_iterator, timeout, metadata=None): def __call__(
self, request_iterator, timeout, metadata=None, protocol_options=None):
"""Invokes the underlying RPC. """Invokes the underlying RPC.
Args: Args:
@ -533,6 +563,8 @@ class StreamStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and an iterator of response An object that is both a Call for the RPC and an iterator of response
@ -543,7 +575,8 @@ class StreamStreamMultiCallable(object):
@abc.abstractmethod @abc.abstractmethod
def event( def event(
self, receiver, abortion_callback, timeout, metadata=None): self, receiver, abortion_callback, timeout, metadata=None,
protocol_options=None):
"""Asynchronously invokes the underlying RPC. """Asynchronously invokes the underlying RPC.
Args: Args:
@ -553,6 +586,8 @@ class StreamStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of metadata: A metadata value to be passed to the service-side of
the RPC. the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A single object that is both a Call object for the RPC and a A single object that is both a Call object for the RPC and a
@ -646,7 +681,7 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def blocking_unary_unary( def blocking_unary_unary(
self, group, method, request, timeout, metadata=None, self, group, method, request, timeout, metadata=None,
with_call=False): with_call=False, protocol_options=None):
"""Invokes a unary-request-unary-response method. """Invokes a unary-request-unary-response method.
This method blocks until either returning the response value of the RPC This method blocks until either returning the response value of the RPC
@ -661,6 +696,8 @@ class GenericStub(object):
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition with_call: Whether or not to include return a Call for the RPC in addition
to the reponse. to the reponse.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
The response value for the RPC, and a Call for the RPC if with_call was The response value for the RPC, and a Call for the RPC if with_call was
@ -673,7 +710,8 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def future_unary_unary( def future_unary_unary(
self, group, method, request, timeout, metadata=None): self, group, method, request, timeout, metadata=None,
protocol_options=None):
"""Invokes a unary-request-unary-response method. """Invokes a unary-request-unary-response method.
Args: Args:
@ -682,6 +720,8 @@ class GenericStub(object):
request: The request value for the RPC. request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and a future.Future. In the An object that is both a Call for the RPC and a future.Future. In the
@ -693,7 +733,8 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def inline_unary_stream( def inline_unary_stream(
self, group, method, request, timeout, metadata=None): self, group, method, request, timeout, metadata=None,
protocol_options=None):
"""Invokes a unary-request-stream-response method. """Invokes a unary-request-stream-response method.
Args: Args:
@ -702,6 +743,8 @@ class GenericStub(object):
request: The request value for the RPC. request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and an iterator of response An object that is both a Call for the RPC and an iterator of response
@ -713,7 +756,7 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def blocking_stream_unary( def blocking_stream_unary(
self, group, method, request_iterator, timeout, metadata=None, self, group, method, request_iterator, timeout, metadata=None,
with_call=False): with_call=False, protocol_options=None):
"""Invokes a stream-request-unary-response method. """Invokes a stream-request-unary-response method.
This method blocks until either returning the response value of the RPC This method blocks until either returning the response value of the RPC
@ -728,6 +771,8 @@ class GenericStub(object):
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition with_call: Whether or not to include return a Call for the RPC in addition
to the reponse. to the reponse.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
The response value for the RPC, and a Call for the RPC if with_call was The response value for the RPC, and a Call for the RPC if with_call was
@ -740,7 +785,8 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def future_stream_unary( def future_stream_unary(
self, group, method, request_iterator, timeout, metadata=None): self, group, method, request_iterator, timeout, metadata=None,
protocol_options=None):
"""Invokes a stream-request-unary-response method. """Invokes a stream-request-unary-response method.
Args: Args:
@ -749,6 +795,8 @@ class GenericStub(object):
request_iterator: An iterator that yields request values for the RPC. request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and a future.Future. In the An object that is both a Call for the RPC and a future.Future. In the
@ -760,7 +808,8 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def inline_stream_stream( def inline_stream_stream(
self, group, method, request_iterator, timeout, metadata=None): self, group, method, request_iterator, timeout, metadata=None,
protocol_options=None):
"""Invokes a stream-request-stream-response method. """Invokes a stream-request-stream-response method.
Args: Args:
@ -769,6 +818,8 @@ class GenericStub(object):
request_iterator: An iterator that yields request values for the RPC. request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
An object that is both a Call for the RPC and an iterator of response An object that is both a Call for the RPC and an iterator of response
@ -780,7 +831,7 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def event_unary_unary( def event_unary_unary(
self, group, method, request, receiver, abortion_callback, timeout, self, group, method, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-unary-response method. """Event-driven invocation of a unary-request-unary-response method.
Args: Args:
@ -792,6 +843,8 @@ class GenericStub(object):
in the event of RPC abortion. in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A Call for the RPC. A Call for the RPC.
@ -801,7 +854,7 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def event_unary_stream( def event_unary_stream(
self, group, method, request, receiver, abortion_callback, timeout, self, group, method, request, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-stream-response method. """Event-driven invocation of a unary-request-stream-response method.
Args: Args:
@ -813,6 +866,8 @@ class GenericStub(object):
in the event of RPC abortion. in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A Call for the RPC. A Call for the RPC.
@ -822,7 +877,7 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def event_stream_unary( def event_stream_unary(
self, group, method, receiver, abortion_callback, timeout, self, group, method, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-unary-response method. """Event-driven invocation of a unary-request-unary-response method.
Args: Args:
@ -833,6 +888,8 @@ class GenericStub(object):
in the event of RPC abortion. in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the A pair of a Call object for the RPC and a stream.Consumer to which the
@ -843,7 +900,7 @@ class GenericStub(object):
@abc.abstractmethod @abc.abstractmethod
def event_stream_stream( def event_stream_stream(
self, group, method, receiver, abortion_callback, timeout, self, group, method, receiver, abortion_callback, timeout,
metadata=None): metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-stream-response method. """Event-driven invocation of a unary-request-stream-response method.
Args: Args:
@ -854,6 +911,8 @@ class GenericStub(object):
in the event of RPC abortion. in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC. timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC. metadata: A metadata value to be passed to the service-side of the RPC.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
Returns: Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the A pair of a Call object for the RPC and a stream.Consumer to which the

@ -34,14 +34,13 @@ import collections
import enum import enum
class Transport(collections.namedtuple('Transport', ('kind', 'value',))): class Protocol(collections.namedtuple('Protocol', ('kind', 'value',))):
"""A sum type for handles to an underlying transport system. """A sum type for handles to a system that transmits tickets.
Attributes: Attributes:
kind: A Kind value identifying the kind of value being passed to or from kind: A Kind value identifying the kind of value being passed.
the underlying transport. value: The value being passed between the high-level application and the
value: The value being passed through RPC Framework between the high-level system affording ticket transport.
application and the underlying transport.
""" """
@enum.unique @enum.unique
@ -56,8 +55,7 @@ class Ticket(
'Ticket', 'Ticket',
('operation_id', 'sequence_number', 'group', 'method', 'subscription', ('operation_id', 'sequence_number', 'group', 'method', 'subscription',
'timeout', 'allowance', 'initial_metadata', 'payload', 'timeout', 'allowance', 'initial_metadata', 'payload',
'terminal_metadata', 'code', 'message', 'termination', 'terminal_metadata', 'code', 'message', 'termination', 'protocol',))):
'transport',))):
"""A sum type for all values sent from a front to a back. """A sum type for all values sent from a front to a back.
Attributes: Attributes:
@ -99,8 +97,8 @@ class Ticket(
termination: A Termination value describing the end of the operation, or termination: A Termination value describing the end of the operation, or
None if the operation has not yet terminated. If set, no further tickets None if the operation has not yet terminated. If set, no further tickets
may be sent in the same direction. may be sent in the same direction.
transport: A Transport value or None, with further semantics being a matter protocol: A Protocol value or None, with further semantics being a matter
between high-level application and underlying transport. between high-level application and underlying ticket transport.
""" """
@enum.unique @enum.unique

Loading…
Cancel
Save