The RPC Framework crust package

pull/3052/head
Nathaniel Manista 9 years ago
parent 1f1919c8cf
commit 4c8288ec01
  1. 30
      src/python/grpcio/grpc/framework/crust/__init__.py
  2. 204
      src/python/grpcio/grpc/framework/crust/_calls.py
  3. 545
      src/python/grpcio/grpc/framework/crust/_control.py
  4. 166
      src/python/grpcio/grpc/framework/crust/_service.py
  5. 352
      src/python/grpcio/grpc/framework/crust/implementations.py
  6. 2
      src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
  7. 160
      src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
  8. 111
      src/python/grpcio_test/grpc_test/framework/_crust_over_core_face_interface_test.py
  9. 37
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_3069_test_constant.py
  10. 9
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
  11. 11
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
  12. 17
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
  13. 2
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
  14. 2
      tools/run_tests/run_python.sh

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,204 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utility functions for invoking RPCs."""
from grpc.framework.crust import _control
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.face import face
_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
_EMPTY_COMPLETION = utilities.completion(None, None, None)
def _invoke(end, group, method, timeout, initial_metadata, payload, complete):
rendezvous = _control.Rendezvous(None, None)
operation_context, operator = end.operate(
group, method, utilities.full_subscription(rendezvous), timeout,
initial_metadata=initial_metadata, payload=payload,
completion=_EMPTY_COMPLETION if complete else None)
rendezvous.set_operator_and_context(operator, operation_context)
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is not None:
rendezvous.set_outcome(outcome)
return rendezvous, operation_context, outcome
def _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
if outcome is None:
def in_pool():
abortion = rendezvous.add_abortion_callback(abortion_callback)
if abortion is None:
try:
receiver.initial_metadata(rendezvous.initial_metadata())
receiver.response(next(rendezvous))
receiver.complete(
rendezvous.terminal_metadata(), rendezvous.code(),
rendezvous.details())
except face.AbortionError:
pass
else:
abortion_callback(abortion)
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
if outcome is None:
def in_pool():
abortion = rendezvous.add_abortion_callback(abortion_callback)
if abortion is None:
try:
receiver.initial_metadata(rendezvous.initial_metadata())
for response in rendezvous:
receiver.response(response)
receiver.complete(
rendezvous.terminal_metadata(), rendezvous.code(),
rendezvous.details())
except face.AbortionError:
pass
else:
abortion_callback(abortion)
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def blocking_unary_unary(
end, group, method, timeout, with_call, initial_metadata, payload):
"""Services in a blocking fashion a unary-unary servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
if with_call:
return next(rendezvous, rendezvous)
else:
return next(rendezvous)
def future_unary_unary(end, group, method, timeout, initial_metadata, payload):
"""Services a value-in value-out servicer method by returning a Future."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
return rendezvous
def inline_unary_stream(end, group, method, timeout, initial_metadata, payload):
"""Services a value-in stream-out servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
return rendezvous
def blocking_stream_unary(
end, group, method, timeout, with_call, initial_metadata, payload_iterator,
pool):
"""Services in a blocking fashion a stream-in value-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
rendezvous.consume(payload)
rendezvous.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context))
if with_call:
return next(rendezvous), rendezvous
else:
return next(rendezvous)
else:
if with_call:
return next(rendezvous), rendezvous
else:
return next(rendezvous)
def future_stream_unary(
end, group, method, timeout, initial_metadata, payload_iterator, pool):
"""Services a stream-in value-out servicer method by returning a Future."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
rendezvous.consume(payload)
rendezvous.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def inline_stream_stream(
end, group, method, timeout, initial_metadata, payload_iterator, pool):
"""Services a stream-in stream-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
rendezvous.consume(payload)
rendezvous.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context))
return rendezvous
def event_unary_unary(
end, group, method, timeout, initial_metadata, payload, receiver,
abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_unary_stream(
end, group, method, timeout, initial_metadata, payload,
receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_unary(
end, group, method, timeout, initial_metadata, receiver, abortion_callback,
pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_stream(
end, group, method, timeout, initial_metadata, receiver, abortion_callback,
pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)

@ -0,0 +1,545 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior for translating between sync and async control flow."""
import collections
import enum
import sys
import threading
import time
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import future
from grpc.framework.foundation import stream
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.face import face
_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:'
_CANNOT_SET_INITIAL_METADATA = (
'Could not set initial metadata - has it already been set, or has a ' +
'payload already been sent?')
_CANNOT_SET_TERMINAL_METADATA = (
'Could not set terminal metadata - has it already been set, or has RPC ' +
'completion already been indicated?')
_CANNOT_SET_CODE = (
'Could not set code - has it already been set, or has RPC completion ' +
'already been indicated?')
_CANNOT_SET_DETAILS = (
'Could not set details - has it already been set, or has RPC completion ' +
'already been indicated?')
class _DummyOperator(base.Operator):
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
pass
_DUMMY_OPERATOR = _DummyOperator()
class _Awaited(
collections.namedtuple('_Awaited', ('kind', 'value',))):
@enum.unique
class Kind(enum.Enum):
NOT_YET_ARRIVED = 'not yet arrived'
ARRIVED = 'arrived'
_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
class _Transitory(
collections.namedtuple('_Transitory', ('kind', 'value',))):
@enum.unique
class Kind(enum.Enum):
NOT_YET_SEEN = 'not yet seen'
PRESENT = 'present'
GONE = 'gone'
_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
_GONE = _Transitory(_Transitory.Kind.GONE, None)
class _Termination(
collections.namedtuple(
'_Termination', ('terminated', 'abortion', 'abortion_error',))):
"""Values indicating whether and how an RPC has terminated.
Attributes:
terminated: A boolean indicating whether or not the RPC has terminated.
abortion: A face.Abortion value describing the RPC's abortion or None if the
RPC did not abort.
abortion_error: A face.AbortionError describing the RPC's abortion or None
if the RPC did not abort.
"""
_NOT_TERMINATED = _Termination(False, None, None)
_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = {
base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None),
base.Outcome.CANCELLED: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
face.CancellationError(*args)),
base.Outcome.EXPIRED: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
face.ExpirationError(*args)),
base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
face.LocalShutdownError(*args)),
base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
face.RemoteShutdownError(*args)),
base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
face.NetworkError(*args)),
base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
face.NetworkError(*args)),
base.Outcome.LOCAL_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
face.LocalError(*args)),
base.Outcome.REMOTE_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
face.RemoteError(*args)),
}
def _wait_once_until(condition, until):
if until is None:
condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
raise future.TimeoutError()
else:
condition.wait(timeout=remaining)
def _done_callback_as_operation_termination_callback(
done_callback, rendezvous):
def operation_termination_callback(operation_outcome):
rendezvous.set_outcome(operation_outcome)
done_callback(rendezvous)
return operation_termination_callback
def _abortion_callback_as_operation_termination_callback(
rpc_abortion_callback, rendezvous_set_outcome):
def operation_termination_callback(operation_outcome):
termination = rendezvous_set_outcome(operation_outcome)
if termination.abortion is not None:
rpc_abortion_callback(termination.abortion)
return operation_termination_callback
class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
"""A rendez-vous for the threads of an operation.
Instances of this object present iterator and stream.Consumer interfaces for
interacting with application code and present a base.Operator interface and
maintain a base.Operator internally for interacting with base interface code.
"""
def __init__(self, operator, operation_context):
self._condition = threading.Condition()
self._operator = operator
self._operation_context = operation_context
self._up_initial_metadata = _NOT_YET_ARRIVED
self._up_payload = None
self._up_allowance = 1
self._up_completion = _NOT_YET_ARRIVED
self._down_initial_metadata = _NOT_YET_SEEN
self._down_payload = None
self._down_allowance = 1
self._down_terminal_metadata = _NOT_YET_SEEN
self._down_code = _NOT_YET_SEEN
self._down_details = _NOT_YET_SEEN
self._termination = _NOT_TERMINATED
# The semantics of future.Future.cancel and future.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
# result of the RPC. This field tracks whether cancellation was requested
# prior to termination of the RPC
self._cancelled = False
def set_operator_and_context(self, operator, operation_context):
with self._condition:
self._operator = operator
self._operation_context = operation_context
def _down_completion(self):
if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN:
terminal_metadata = None
self._down_terminal_metadata = _GONE
elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT:
terminal_metadata = self._down_terminal_metadata.value
self._down_terminal_metadata = _GONE
else:
terminal_metadata = None
if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN:
code = None
self._down_code = _GONE
elif self._down_code.kind is _Transitory.Kind.PRESENT:
code = self._down_code.value
self._down_code = _GONE
else:
code = None
if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN:
details = None
self._down_details = _GONE
elif self._down_details.kind is _Transitory.Kind.PRESENT:
details = self._down_details.value
self._down_details = _GONE
else:
details = None
return utilities.completion(terminal_metadata, code, details)
def _set_outcome(self, outcome):
if not self._termination.terminated:
self._operator = _DUMMY_OPERATOR
self._operation_context = None
self._down_initial_metadata = _GONE
self._down_payload = None
self._down_terminal_metadata = _GONE
self._down_code = _GONE
self._down_details = _GONE
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
initial_metadata = None
else:
initial_metadata = self._up_initial_metadata.value
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
terminal_metadata, code, details = None, None, None
else:
terminal_metadata = self._up_completion.value.terminal_metadata
code = self._up_completion.value.code
details = self._up_completion.value.message
self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[
outcome](initial_metadata, terminal_metadata, code, details)
self._condition.notify_all()
return self._termination
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
with self._condition:
if initial_metadata is not None:
self._up_initial_metadata = _Awaited(
_Awaited.Kind.ARRIVED, initial_metadata)
if payload is not None:
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
self._up_initial_metadata = _ARRIVED_AND_NONE
self._up_payload = payload
self._up_allowance -= 1
if completion is not None:
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
self._up_initial_metadata = _ARRIVED_AND_NONE
self._up_completion = _Awaited(
_Awaited.Kind.ARRIVED, completion)
if allowance is not None:
if self._down_payload is not None:
self._operator.advance(payload=self._down_payload)
self._down_payload = None
self._down_allowance += allowance - 1
else:
self._down_allowance += allowance
self._condition.notify_all()
def cancel(self):
with self._condition:
if self._operation_context is not None:
self._operation_context.cancel()
self._cancelled = True
return False
def cancelled(self):
with self._condition:
return self._cancelled
def running(self):
with self._condition:
return not self._termination.terminated
def done(self):
with self._condition:
return self._termination.terminated
def result(self, timeout=None):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._termination.terminated:
if self._termination.abortion is None:
return self._up_payload
elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED:
raise future.CancelledError()
else:
raise self._termination.abortion_error # pylint: disable=raising-bad-type
else:
_wait_once_until(self._condition, until)
def exception(self, timeout=None):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._termination.terminated:
if self._termination.abortion is None:
return None
else:
return self._termination.abortion_error
else:
_wait_once_until(self._condition, until)
def traceback(self, timeout=None):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._termination.terminated:
if self._termination.abortion_error is None:
return None
else:
abortion_error = self._termination.abortion_error
break
else:
_wait_once_until(self._condition, until)
try:
raise abortion_error
except face.AbortionError:
return sys.exc_info()[2]
def add_done_callback(self, fn):
with self._condition:
if self._operation_context is not None:
outcome = self._operation_context.add_termination_callback(
_done_callback_as_operation_termination_callback(fn, self))
if outcome is None:
return
else:
self._set_outcome(outcome)
fn(self)
def consume(self, value):
with self._condition:
while True:
if self._termination.terminated:
return
elif 0 < self._down_allowance:
self._operator.advance(payload=value)
self._down_allowance -= 1
return
else:
self._condition.wait()
def terminate(self):
with self._condition:
if self._termination.terminated:
return
elif self._down_code.kind is _Transitory.Kind.GONE:
# Conform to specified idempotence of terminate by ignoring extra calls.
return
else:
completion = self._down_completion()
self._operator.advance(completion=completion)
def consume_and_terminate(self, value):
with self._condition:
while True:
if self._termination.terminated:
return
elif 0 < self._down_allowance:
completion = self._down_completion()
self._operator.advance(payload=value, completion=completion)
return
else:
self._condition.wait()
def __iter__(self):
return self
def next(self):
with self._condition:
while True:
if self._termination.abortion_error is not None:
raise self._termination.abortion_error
elif self._up_payload is not None:
payload = self._up_payload
self._up_payload = None
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
self._operator.advance(allowance=1)
return payload
elif self._up_completion.kind is _Awaited.Kind.ARRIVED:
raise StopIteration()
else:
self._condition.wait()
def is_active(self):
with self._condition:
return not self._termination.terminated
def time_remaining(self):
if self._operation_context is None:
return 0
else:
return self._operation_context.time_remaining()
def add_abortion_callback(self, abortion_callback):
with self._condition:
if self._operation_context is None:
return self._termination.abortion
else:
outcome = self._operation_context.add_termination_callback(
_abortion_callback_as_operation_termination_callback(
abortion_callback, self.set_outcome))
if outcome is not None:
return self._set_outcome(outcome).abortion
else:
return self._termination.abortion
def initial_metadata(self):
with self._condition:
while True:
if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED:
return self._up_initial_metadata.value
elif self._termination.terminated:
return None
else:
self._condition.wait()
def terminal_metadata(self):
with self._condition:
while True:
if self._up_completion.kind is _Awaited.Kind.ARRIVED:
return self._up_completion.value.terminal_metadata
elif self._termination.terminated:
return None
else:
self._condition.wait()
def code(self):
with self._condition:
while True:
if self._up_completion.kind is _Awaited.Kind.ARRIVED:
return self._up_completion.value.code
elif self._termination.terminated:
return None
else:
self._condition.wait()
def details(self):
with self._condition:
while True:
if self._up_completion.kind is _Awaited.Kind.ARRIVED:
return self._up_completion.value.message
elif self._termination.terminated:
return None
else:
self._condition.wait()
def set_initial_metadata(self, initial_metadata):
with self._condition:
if (self._down_initial_metadata.kind is not
_Transitory.Kind.NOT_YET_SEEN):
raise ValueError(_CANNOT_SET_INITIAL_METADATA)
else:
self._down_initial_metadata = _GONE
self._operator.advance(initial_metadata=initial_metadata)
def set_terminal_metadata(self, terminal_metadata):
with self._condition:
if (self._down_terminal_metadata.kind is not
_Transitory.Kind.NOT_YET_SEEN):
raise ValueError(_CANNOT_SET_TERMINAL_METADATA)
else:
self._down_terminal_metadata = _Transitory(
_Transitory.Kind.PRESENT, terminal_metadata)
def set_code(self, code):
with self._condition:
if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN:
raise ValueError(_CANNOT_SET_CODE)
else:
self._down_code = _Transitory(_Transitory.Kind.PRESENT, code)
def set_details(self, details):
with self._condition:
if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN:
raise ValueError(_CANNOT_SET_DETAILS)
else:
self._down_details = _Transitory(_Transitory.Kind.PRESENT, details)
def set_outcome(self, outcome):
with self._condition:
return self._set_outcome(outcome)
def pool_wrap(behavior, operation_context):
"""Wraps an operation-related behavior so that it may be called in a pool.
Args:
behavior: A callable related to carrying out an operation.
operation_context: A base_interfaces.OperationContext for the operation.
Returns:
A callable that when called carries out the behavior of the given callable
and handles whatever exceptions it raises appropriately.
"""
def translation(*args):
try:
behavior(*args)
except (
abandonment.Abandoned,
face.CancellationError,
face.ExpirationError,
face.LocalShutdownError,
face.RemoteShutdownError,
face.NetworkError,
face.RemoteError,
) as e:
if operation_context.outcome() is None:
operation_context.fail(e)
except Exception as e:
operation_context.fail(e)
return callable_util.with_exceptions_logged(
translation, _INTERNAL_ERROR_LOG_MESSAGE)

@ -0,0 +1,166 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Behaviors for servicing RPCs."""
from grpc.framework.crust import _control
from grpc.framework.foundation import abandonment
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.face import face
class _ServicerContext(face.ServicerContext):
def __init__(self, rendezvous):
self._rendezvous = rendezvous
def is_active(self):
return self._rendezvous.is_active()
def time_remaining(self):
return self._rendezvous.time_remaining()
def add_abortion_callback(self, abortion_callback):
return self._rendezvous.add_abortion_callback(abortion_callback)
def cancel(self):
self._rendezvous.cancel()
def invocation_metadata(self):
return self._rendezvous.initial_metadata()
def initial_metadata(self, initial_metadata):
self._rendezvous.set_initial_metadata(initial_metadata)
def terminal_metadata(self, terminal_metadata):
self._rendezvous.set_terminal_metadata(terminal_metadata)
def code(self, code):
self._rendezvous.set_code(code)
def details(self, details):
self._rendezvous.set_details(details)
def _adaptation(pool, in_pool):
def adaptation(operator, operation_context):
rendezvous = _control.Rendezvous(operator, operation_context)
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is None:
pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
return utilities.full_subscription(rendezvous)
else:
raise abandonment.Abandoned()
return adaptation
def adapt_inline_unary_unary(method, pool):
def in_pool(rendezvous):
request = next(rendezvous)
response = method(request, _ServicerContext(rendezvous))
rendezvous.consume_and_terminate(response)
return _adaptation(pool, in_pool)
def adapt_inline_unary_stream(method, pool):
def in_pool(rendezvous):
request = next(rendezvous)
response_iterator = method(request, _ServicerContext(rendezvous))
for response in response_iterator:
rendezvous.consume(response)
rendezvous.terminate()
return _adaptation(pool, in_pool)
def adapt_inline_stream_unary(method, pool):
def in_pool(rendezvous):
response = method(rendezvous, _ServicerContext(rendezvous))
rendezvous.consume_and_terminate(response)
return _adaptation(pool, in_pool)
def adapt_inline_stream_stream(method, pool):
def in_pool(rendezvous):
response_iterator = method(rendezvous, _ServicerContext(rendezvous))
for response in response_iterator:
rendezvous.consume(response)
rendezvous.terminate()
return _adaptation(pool, in_pool)
def adapt_event_unary_unary(method, pool):
def in_pool(rendezvous):
request = next(rendezvous)
method(
request, rendezvous.consume_and_terminate, _ServicerContext(rendezvous))
return _adaptation(pool, in_pool)
def adapt_event_unary_stream(method, pool):
def in_pool(rendezvous):
request = next(rendezvous)
method(request, rendezvous, _ServicerContext(rendezvous))
return _adaptation(pool, in_pool)
def adapt_event_stream_unary(method, pool):
def in_pool(rendezvous):
request_consumer = method(
rendezvous.consume_and_terminate, _ServicerContext(rendezvous))
for request in rendezvous:
request_consumer.consume(request)
request_consumer.terminate()
return _adaptation(pool, in_pool)
def adapt_event_stream_stream(method, pool):
def in_pool(rendezvous):
request_consumer = method(rendezvous, _ServicerContext(rendezvous))
for request in rendezvous:
request_consumer.consume(request)
request_consumer.terminate()
return _adaptation(pool, in_pool)
def adapt_multi_method(multi_method, pool):
def adaptation(group, method, operator, operation_context):
rendezvous = _control.Rendezvous(operator, operation_context)
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is None:
def in_pool():
request_consumer = multi_method(
group, method, rendezvous, _ServicerContext(rendezvous))
for request in rendezvous:
request_consumer.consume(request)
request_consumer.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
return utilities.full_subscription(rendezvous)
else:
raise abandonment.Abandoned()
return adaptation

@ -0,0 +1,352 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into the Crust layer of RPC Framework."""
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.crust import _calls
from grpc.framework.crust import _service
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.face import face
class _BaseServicer(base.Servicer):
def __init__(self, adapted_methods, adapted_multi_method):
self._adapted_methods = adapted_methods
self._adapted_multi_method = adapted_multi_method
def service(self, group, method, context, output_operator):
adapted_method = self._adapted_methods.get((group, method), None)
if adapted_method is not None:
return adapted_method(output_operator, context)
elif self._adapted_multi_method is not None:
try:
return self._adapted_multi_method.service(
group, method, output_operator, context)
except face.NoSuchMethodError:
raise base.NoSuchMethodError()
else:
raise base.NoSuchMethodError()
class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
def __init__(self, end, group, method, pool):
self._end = end
self._group = group
self._method = method
self._pool = pool
def __call__(
self, request, timeout, metadata=None, with_call=False):
return _calls.blocking_unary_unary(
self._end, self._group, self._method, timeout, with_call,
metadata, request)
def future(self, request, timeout, metadata=None):
return _calls.future_unary_unary(
self._end, self._group, self._method, timeout, metadata,
request)
def event(
self, request, receiver, abortion_callback, timeout,
metadata=None):
return _calls.event_unary_unary(
self._end, self._group, self._method, timeout, metadata,
request, receiver, abortion_callback, self._pool)
class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
def __init__(self, end, group, method, pool):
self._end = end
self._group = group
self._method = method
self._pool = pool
def __call__(self, request, timeout, metadata=None):
return _calls.inline_unary_stream(
self._end, self._group, self._method, timeout, metadata,
request)
def event(
self, request, receiver, abortion_callback, timeout,
metadata=None):
return _calls.event_unary_stream(
self._end, self._group, self._method, timeout, metadata,
request, receiver, abortion_callback, self._pool)
class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
def __init__(self, end, group, method, pool):
self._end = end
self._group = group
self._method = method
self._pool = pool
def __call__(
self, request_iterator, timeout, metadata=None,
with_call=False):
return _calls.blocking_stream_unary(
self._end, self._group, self._method, timeout, with_call,
metadata, request_iterator, self._pool)
def future(self, request_iterator, timeout, metadata=None):
return _calls.future_stream_unary(
self._end, self._group, self._method, timeout, metadata,
request_iterator, self._pool)
def event(
self, receiver, abortion_callback, timeout, metadata=None):
return _calls.event_stream_unary(
self._end, self._group, self._method, timeout, metadata,
receiver, abortion_callback, self._pool)
class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
def __init__(self, end, group, method, pool):
self._end = end
self._group = group
self._method = method
self._pool = pool
def __call__(self, request_iterator, timeout, metadata=None):
return _calls.inline_stream_stream(
self._end, self._group, self._method, timeout, metadata,
request_iterator, self._pool)
def event(
self, receiver, abortion_callback, timeout, metadata=None):
return _calls.event_stream_stream(
self._end, self._group, self._method, timeout, metadata,
receiver, abortion_callback, self._pool)
class _GenericStub(face.GenericStub):
"""An face.GenericStub implementation."""
def __init__(self, end, pool):
self._end = end
self._pool = pool
def blocking_unary_unary(
self, group, method, request, timeout, metadata=None,
with_call=None):
return _calls.blocking_unary_unary(
self._end, group, method, timeout, with_call, metadata,
request)
def future_unary_unary(
self, group, method, request, timeout, metadata=None):
return _calls.future_unary_unary(
self._end, group, method, timeout, metadata, request)
def inline_unary_stream(
self, group, method, request, timeout, metadata=None):
return _calls.inline_unary_stream(
self._end, group, method, timeout, metadata, request)
def blocking_stream_unary(
self, group, method, request_iterator, timeout, metadata=None,
with_call=None):
return _calls.blocking_stream_unary(
self._end, group, method, timeout, with_call, metadata,
request_iterator, self._pool)
def future_stream_unary(
self, group, method, request_iterator, timeout, metadata=None):
return _calls.future_stream_unary(
self._end, group, method, timeout, metadata,
request_iterator, self._pool)
def inline_stream_stream(
self, group, method, request_iterator, timeout, metadata=None):
return _calls.inline_stream_stream(
self._end, group, method, timeout, metadata,
request_iterator, self._pool)
def event_unary_unary(
self, group, method, request, receiver, abortion_callback, timeout,
metadata=None):
return _calls.event_unary_unary(
self._end, group, method, timeout, metadata, request,
receiver, abortion_callback, self._pool)
def event_unary_stream(
self, group, method, request, receiver, abortion_callback, timeout,
metadata=None):
return _calls.event_unary_stream(
self._end, group, method, timeout, metadata, request,
receiver, abortion_callback, self._pool)
def event_stream_unary(
self, group, method, receiver, abortion_callback, timeout,
metadata=None):
return _calls.event_stream_unary(
self._end, group, method, timeout, metadata, receiver,
abortion_callback, self._pool)
def event_stream_stream(
self, group, method, receiver, abortion_callback, timeout,
metadata=None):
return _calls.event_stream_stream(
self._end, group, method, timeout, metadata, receiver,
abortion_callback, self._pool)
def unary_unary(self, group, method):
return _UnaryUnaryMultiCallable(self._end, group, method, self._pool)
def unary_stream(self, group, method):
return _UnaryStreamMultiCallable(self._end, group, method, self._pool)
def stream_unary(self, group, method):
return _StreamUnaryMultiCallable(self._end, group, method, self._pool)
def stream_stream(self, group, method):
return _StreamStreamMultiCallable(self._end, group, method, self._pool)
class _DynamicStub(face.DynamicStub):
"""An face.DynamicStub implementation."""
def __init__(self, end, group, cardinalities, pool):
self._end = end
self._group = group
self._cardinalities = cardinalities
self._pool = pool
def __getattr__(self, attr):
method_cardinality = self._cardinalities.get(attr)
if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
return _UnaryUnaryMultiCallable(self._end, self._group, attr, self._pool)
elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
return _UnaryStreamMultiCallable(self._end, self._group, attr, self._pool)
elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
return _StreamUnaryMultiCallable(self._end, self._group, attr, self._pool)
elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
return _StreamStreamMultiCallable(
self._end, self._group, attr, self._pool)
else:
raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
def _adapt_method_implementations(method_implementations, pool):
adapted_implementations = {}
for name, method_implementation in method_implementations.iteritems():
if method_implementation.style is style.Service.INLINE:
if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
adapted_implementations[name] = _service.adapt_inline_unary_unary(
method_implementation.unary_unary_inline, pool)
elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
adapted_implementations[name] = _service.adapt_inline_unary_stream(
method_implementation.unary_stream_inline, pool)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
adapted_implementations[name] = _service.adapt_inline_stream_unary(
method_implementation.stream_unary_inline, pool)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
adapted_implementations[name] = _service.adapt_inline_stream_stream(
method_implementation.stream_stream_inline, pool)
elif method_implementation.style is style.Service.EVENT:
if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
adapted_implementations[name] = _service.adapt_event_unary_unary(
method_implementation.unary_unary_event, pool)
elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
adapted_implementations[name] = _service.adapt_event_unary_stream(
method_implementation.unary_stream_event, pool)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
adapted_implementations[name] = _service.adapt_event_stream_unary(
method_implementation.stream_unary_event, pool)
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
adapted_implementations[name] = _service.adapt_event_stream_stream(
method_implementation.stream_stream_event, pool)
return adapted_implementations
def servicer(method_implementations, multi_method_implementation, pool):
"""Creates a base.Servicer.
It is guaranteed that any passed face.MultiMethodImplementation will
only be called to service an RPC if there is no
face.MethodImplementation for the RPC method in the passed
method_implementations dictionary.
Args:
method_implementations: A dictionary from RPC method name to
face.MethodImplementation object to be used to service the named
RPC method.
multi_method_implementation: An face.MultiMethodImplementation to be
used to service any RPCs not serviced by the
face.MethodImplementations given in the method_implementations
dictionary, or None.
pool: A thread pool.
Returns:
A base.Servicer that services RPCs via the given implementations.
"""
adapted_implementations = _adapt_method_implementations(
method_implementations, pool)
adapted_multi_method_implementation = _service.adapt_multi_method(
multi_method_implementation, pool)
return _BaseServicer(
adapted_implementations, adapted_multi_method_implementation)
def generic_stub(end, pool):
"""Creates an face.GenericStub.
Args:
end: A base.End.
pool: A futures.ThreadPoolExecutor.
Returns:
A face.GenericStub that performs RPCs via the given base.End.
"""
return _GenericStub(end, pool)
def dynamic_stub(end, group, cardinalities, pool):
"""Creates an face.DynamicStub.
Args:
end: A base.End.
group: The group identifier for all RPCs to be made with the created
face.DynamicStub.
cardinalities: A dict from method identifier to cardinality.Cardinality
value identifying the cardinality of every RPC method to be supported by
the created face.DynamicStub.
pool: A futures.ThreadPoolExecutor.
Returns:
A face.DynamicStub that performs RPCs via the given base.End.
"""
return _DynamicStub(end, group, cardinalities, pool)

@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests the RPC Framework Core's implementation of the Base interface.""" """Tests Base interface compliance of the core-over-gRPC-links stack."""
import collections import collections
import logging import logging

@ -0,0 +1,160 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests Face compliance of the crust-over-core-over-gRPC-links stack."""
import collections
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
from grpc_test import test_common
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.face import test_cases
from grpc_test.framework.interfaces.face import test_interfaces
from grpc_test.framework.interfaces.links import test_utilities
class _SerializationBehaviors(
collections.namedtuple(
'_SerializationBehaviors',
('request_serializers', 'request_deserializers', 'response_serializers',
'response_deserializers',))):
pass
def _serialization_behaviors_from_test_methods(test_methods):
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for (group, method), test_method in test_methods.iteritems():
request_serializers[group, method] = test_method.serialize_request
request_deserializers[group, method] = test_method.deserialize_request
response_serializers[group, method] = test_method.serialize_response
response_deserializers[group, method] = test_method.deserialize_response
return _SerializationBehaviors(
request_serializers, request_deserializers, response_serializers,
response_deserializers)
class _Implementation(test_interfaces.Implementation):
def instantiate(
self, methods, method_implementations, multi_method_implementation):
pool = logging_pool.pool(test_constants.POOL_SIZE)
servicer = crust_implementations.servicer(
method_implementations, multi_method_implementation, pool)
serialization_behaviors = _serialization_behaviors_from_test_methods(
methods)
invocation_end_link = core_implementations.invocation_end_link()
service_end_link = core_implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port(0, None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost',
serialization_behaviors.request_serializers,
serialization_behaviors.response_deserializers)
invocation_end_link.join_link(invocation_grpc_link)
invocation_grpc_link.join_link(invocation_end_link)
service_grpc_link.join_link(service_end_link)
service_end_link.join_link(service_grpc_link)
service_end_link.start()
invocation_end_link.start()
invocation_grpc_link.start()
service_grpc_link.start()
generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
group = next(iter(methods))[0]
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to
# _digest.TestServiceDigest.
cardinalities = {
method: method_object.cardinality()
for (group, method), method_object in methods.iteritems()}
dynamic_stub = crust_implementations.dynamic_stub(
invocation_end_link, group, cardinalities, pool)
return generic_stub, {group: dynamic_stub}, (
invocation_end_link, invocation_grpc_link, service_grpc_link,
service_end_link, pool)
def destantiate(self, memo):
(invocation_end_link, invocation_grpc_link, service_grpc_link,
service_end_link, pool) = memo
invocation_end_link.stop(0).wait()
invocation_grpc_link.stop()
service_grpc_link.stop_gracefully()
service_end_link.stop(0).wait()
invocation_end_link.join_link(utilities.NULL_LINK)
invocation_grpc_link.join_link(utilities.NULL_LINK)
service_grpc_link.join_link(utilities.NULL_LINK)
service_end_link.join_link(utilities.NULL_LINK)
pool.shutdown(wait=True)
def invocation_metadata(self):
return test_common.INVOCATION_INITIAL_METADATA
def initial_metadata(self):
return test_common.SERVICE_INITIAL_METADATA
def terminal_metadata(self):
return test_common.SERVICE_TERMINAL_METADATA
def code(self):
return _intermediary_low.Code.OK
def details(self):
return test_common.DETAILS
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
original_metadata, transmitted_metadata)
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,111 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests Face interface compliance of the crust-over-core stack."""
import collections
import unittest
from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.face import test_cases
from grpc_test.framework.interfaces.face import test_interfaces
from grpc_test.framework.interfaces.links import test_utilities
class _Implementation(test_interfaces.Implementation):
def instantiate(
self, methods, method_implementations, multi_method_implementation):
pool = logging_pool.pool(test_constants.POOL_SIZE)
servicer = crust_implementations.servicer(
method_implementations, multi_method_implementation, pool)
service_end_link = core_implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
invocation_end_link = core_implementations.invocation_end_link()
invocation_end_link.join_link(service_end_link)
service_end_link.join_link(invocation_end_link)
service_end_link.start()
invocation_end_link.start()
generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
group = next(iter(methods))[0]
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to
# _digest.TestServiceDigest.
cardinalities = {
method: method_object.cardinality()
for (group, method), method_object in methods.iteritems()}
dynamic_stub = crust_implementations.dynamic_stub(
invocation_end_link, group, cardinalities, pool)
return generic_stub, {group: dynamic_stub}, (
invocation_end_link, service_end_link, pool)
def destantiate(self, memo):
invocation_end_link, service_end_link, pool = memo
invocation_end_link.stop(0).wait()
service_end_link.stop(0).wait()
invocation_end_link.join_link(utilities.NULL_LINK)
service_end_link.join_link(utilities.NULL_LINK)
pool.shutdown(wait=True)
def invocation_metadata(self):
return object()
def initial_metadata(self):
return object()
def terminal_metadata(self):
return object()
def code(self):
return object()
def details(self):
return object()
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is transmitted_metadata
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,37 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test constant working around issue 3069."""
# test_constants is referenced from specification in this module.
from grpc_test.framework.common import test_constants # pylint: disable=unused-import
# TODO(issue 3069): Replace uses of this constant with
# test_constants.SHORT_TIMEOUT.
REALLY_SHORT_TIMEOUT = 0.1

@ -37,6 +37,7 @@ from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants from grpc_test.framework.common import test_constants
from grpc_test.framework.common import test_control from grpc_test.framework.common import test_control
from grpc_test.framework.common import test_coverage from grpc_test.framework.common import test_coverage
from grpc_test.framework.interfaces.face import _3069_test_constant
from grpc_test.framework.interfaces.face import _digest from grpc_test.framework.interfaces.face import _digest
from grpc_test.framework.interfaces.face import _stock_service from grpc_test.framework.interfaces.face import _stock_service
from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
@ -170,7 +171,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises( with self._control.pause(), self.assertRaises(
face.ExpirationError): face.ExpirationError):
self._invoker.blocking(group, method)( self._invoker.blocking(group, method)(
request, test_constants.SHORT_TIMEOUT) request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self): def testExpiredUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in ( for (group, method), test_messages_sequence in (
@ -181,7 +182,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises( with self._control.pause(), self.assertRaises(
face.ExpirationError): face.ExpirationError):
response_iterator = self._invoker.blocking(group, method)( response_iterator = self._invoker.blocking(group, method)(
request, test_constants.SHORT_TIMEOUT) request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator) list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self): def testExpiredStreamRequestUnaryResponse(self):
@ -193,7 +194,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises( with self._control.pause(), self.assertRaises(
face.ExpirationError): face.ExpirationError):
self._invoker.blocking(group, method)( self._invoker.blocking(group, method)(
iter(requests), test_constants.SHORT_TIMEOUT) iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
def testExpiredStreamRequestStreamResponse(self): def testExpiredStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in ( for (group, method), test_messages_sequence in (
@ -204,7 +205,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises( with self._control.pause(), self.assertRaises(
face.ExpirationError): face.ExpirationError):
response_iterator = self._invoker.blocking(group, method)( response_iterator = self._invoker.blocking(group, method)(
iter(requests), test_constants.SHORT_TIMEOUT) iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator) list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self): def testFailedUnaryRequestUnaryResponse(self):

@ -37,6 +37,7 @@ from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants from grpc_test.framework.common import test_constants
from grpc_test.framework.common import test_control from grpc_test.framework.common import test_control
from grpc_test.framework.common import test_coverage from grpc_test.framework.common import test_coverage
from grpc_test.framework.interfaces.face import _3069_test_constant
from grpc_test.framework.interfaces.face import _digest from grpc_test.framework.interfaces.face import _digest
from grpc_test.framework.interfaces.face import _receiver from grpc_test.framework.interfaces.face import _receiver
from grpc_test.framework.interfaces.face import _stock_service from grpc_test.framework.interfaces.face import _stock_service
@ -264,7 +265,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(): with self._control.pause():
self._invoker.event(group, method)( self._invoker.event(group, method)(
request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT) request, receiver, receiver.abort,
_3069_test_constant.REALLY_SHORT_TIMEOUT)
receiver.block_until_terminated() receiver.block_until_terminated()
self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
@ -278,7 +280,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(): with self._control.pause():
self._invoker.event(group, method)( self._invoker.event(group, method)(
request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT) request, receiver, receiver.abort,
_3069_test_constant.REALLY_SHORT_TIMEOUT)
receiver.block_until_terminated() receiver.block_until_terminated()
self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
@ -290,7 +293,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
receiver = _receiver.Receiver() receiver = _receiver.Receiver()
self._invoker.event(group, method)( self._invoker.event(group, method)(
receiver, receiver.abort, test_constants.SHORT_TIMEOUT) receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
receiver.block_until_terminated() receiver.block_until_terminated()
self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
@ -303,7 +306,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
receiver = _receiver.Receiver() receiver = _receiver.Receiver()
call_consumer = self._invoker.event(group, method)( call_consumer = self._invoker.event(group, method)(
receiver, receiver.abort, test_constants.SHORT_TIMEOUT) receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
for request in requests: for request in requests:
call_consumer.consume(request) call_consumer.consume(request)
receiver.block_until_terminated() receiver.block_until_terminated()

@ -40,6 +40,7 @@ from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants from grpc_test.framework.common import test_constants
from grpc_test.framework.common import test_control from grpc_test.framework.common import test_control
from grpc_test.framework.common import test_coverage from grpc_test.framework.common import test_coverage
from grpc_test.framework.interfaces.face import _3069_test_constant
from grpc_test.framework.interfaces.face import _digest from grpc_test.framework.interfaces.face import _digest
from grpc_test.framework.interfaces.face import _stock_service from grpc_test.framework.interfaces.face import _stock_service
from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
@ -265,7 +266,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(): with self._control.pause():
response_future = self._invoker.future( response_future = self._invoker.future(
group, method)(request, test_constants.SHORT_TIMEOUT) group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
self.assertIsInstance( self.assertIsInstance(
response_future.exception(), face.ExpirationError) response_future.exception(), face.ExpirationError)
with self.assertRaises(face.ExpirationError): with self.assertRaises(face.ExpirationError):
@ -279,7 +280,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(): with self._control.pause():
response_iterator = self._invoker.future(group, method)( response_iterator = self._invoker.future(group, method)(
request, test_constants.SHORT_TIMEOUT) request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError): with self.assertRaises(face.ExpirationError):
list(response_iterator) list(response_iterator)
@ -291,7 +292,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(): with self._control.pause():
response_future = self._invoker.future(group, method)( response_future = self._invoker.future(group, method)(
iter(requests), test_constants.SHORT_TIMEOUT) iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
self.assertIsInstance( self.assertIsInstance(
response_future.exception(), face.ExpirationError) response_future.exception(), face.ExpirationError)
with self.assertRaises(face.ExpirationError): with self.assertRaises(face.ExpirationError):
@ -305,7 +306,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(): with self._control.pause():
response_iterator = self._invoker.future(group, method)( response_iterator = self._invoker.future(group, method)(
iter(requests), test_constants.SHORT_TIMEOUT) iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError): with self.assertRaises(face.ExpirationError):
list(response_iterator) list(response_iterator)
@ -317,7 +318,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.fail(): with self._control.fail():
response_future = self._invoker.future(group, method)( response_future = self._invoker.future(group, method)(
request, test_constants.SHORT_TIMEOUT) request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the # Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is # servicer-side runtime called into it its failure is
@ -340,7 +341,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
# expiration of the RPC. # expiration of the RPC.
with self._control.fail(), self.assertRaises(face.ExpirationError): with self._control.fail(), self.assertRaises(face.ExpirationError):
response_iterator = self._invoker.future(group, method)( response_iterator = self._invoker.future(group, method)(
request, test_constants.SHORT_TIMEOUT) request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator) list(response_iterator)
def testFailedStreamRequestUnaryResponse(self): def testFailedStreamRequestUnaryResponse(self):
@ -351,7 +352,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.fail(): with self._control.fail():
response_future = self._invoker.future(group, method)( response_future = self._invoker.future(group, method)(
iter(requests), test_constants.SHORT_TIMEOUT) iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the # Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is # servicer-side runtime called into it its failure is
@ -374,5 +375,5 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
# expiration of the RPC. # expiration of the RPC.
with self._control.fail(), self.assertRaises(face.ExpirationError): with self._control.fail(), self.assertRaises(face.ExpirationError):
response_iterator = self._invoker.future(group, method)( response_iterator = self._invoker.future(group, method)(
iter(requests), test_constants.SHORT_TIMEOUT) iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator) list(response_iterator)

@ -1,4 +1,4 @@
B# Copyright 2015, Google Inc. # Copyright 2015, Google Inc.
# All rights reserved. # All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without

@ -45,6 +45,8 @@ source "python"$PYVER"_virtual_environment"/bin/activate
# py.test (or find another tool or *something*) that's acceptable to the rest of # py.test (or find another tool or *something*) that's acceptable to the rest of
# the team... # the team...
"python"$PYVER -m grpc_test._core_over_links_base_interface_test "python"$PYVER -m grpc_test._core_over_links_base_interface_test
"python"$PYVER -m grpc_test._crust_over_core_over_links_face_interface_test
"python"$PYVER -m grpc_test.framework._crust_over_core_face_interface_test
"python"$PYVER -m grpc_test.framework.core._base_interface_test "python"$PYVER -m grpc_test.framework.core._base_interface_test
"python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml" "python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml"

Loading…
Cancel
Save