A test suite for the RPC Framework base interface

I wasn't able to flesh this out nearly as much as I had wanted to but I
can come back to it after Beta (issue #2959).
pull/2963/head
Nathaniel Manista 9 years ago
parent d6225ee6eb
commit 04b4ca121f
  1. 10
      src/python/grpcio_test/grpc_test/framework/common/test_constants.py
  2. 30
      src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py
  3. 568
      src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
  4. 168
      src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
  5. 55
      src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py
  6. 260
      src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
  7. 186
      src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py

@ -29,15 +29,25 @@
"""Constants shared among tests throughout RPC Framework."""
# Value for maximum duration in seconds that a test is allowed for its actual
# behavioral logic, excluding all time spent deliberately waiting in the test.
TIME_ALLOWANCE = 10
# Value for maximum duration in seconds of RPCs that may time out as part of a
# test.
SHORT_TIMEOUT = 4
# Absurdly large value for maximum duration in seconds for should-not-time-out
# RPCs made during tests.
LONG_TIMEOUT = 3000
# Values to supply on construction of an object that will service RPCs; these
# should not be used as the actual timeout values of any RPCs made during tests.
DEFAULT_TIMEOUT = 300
MAXIMUM_TIMEOUT = 3600
# The number of payloads to transmit in streaming tests.
STREAM_LENGTH = 200
# The size of payloads to transmit in tests.
PAYLOAD_SIZE = 256 * 1024 + 17
# The size of thread pools to use in tests.
POOL_SIZE = 10

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,568 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Part of the tests of the base interface of RPC Framework."""
import abc
import collections
import enum
import random # pylint: disable=unused-import
import threading
import time
from grpc.framework.interfaces.base import base
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import _sequence
from grpc_test.framework.interfaces.base import _state
from grpc_test.framework.interfaces.base import test_interfaces # pylint: disable=unused-import
_GROUP = 'base test cases test group'
_METHOD = 'base test cases test method'
_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20
_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600
def _create_payload(randomness):
length = randomness.randint(
_MINIMUM_PAYLOAD_SIZE, test_constants.PAYLOAD_SIZE)
random_section_length = randomness.randint(
0, min(_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE, length))
random_section = bytes(
bytearray(
randomness.getrandbits(8) for _ in range(random_section_length)))
sevens_section = '\x07' * (length - random_section_length)
return b''.join(randomness.sample((random_section, sevens_section), 2))
def _anything_in_flight(state):
return (
state.invocation_initial_metadata_in_flight is not None or
state.invocation_payloads_in_flight or
state.invocation_completion_in_flight is not None or
state.service_initial_metadata_in_flight is not None or
state.service_payloads_in_flight or
state.service_completion_in_flight is not None or
0 < state.invocation_allowance_in_flight or
0 < state.service_allowance_in_flight
)
def _verify_service_advance_and_update_state(
initial_metadata, payload, completion, allowance, state, implementation):
if initial_metadata is not None:
if state.invocation_initial_metadata_received:
return 'Later invocation initial metadata received: %s' % (
initial_metadata,)
if state.invocation_payloads_received:
return 'Invocation initial metadata received after payloads: %s' % (
state.invocation_payloads_received)
if state.invocation_completion_received:
return 'Invocation initial metadata received after invocation completion!'
if not implementation.metadata_transmitted(
state.invocation_initial_metadata_in_flight, initial_metadata):
return 'Invocation initial metadata maltransmitted: %s, %s' % (
state.invocation_initial_metadata_in_flight, initial_metadata)
else:
state.invocation_initial_metadata_in_flight = None
state.invocation_initial_metadata_received = True
if payload is not None:
if state.invocation_completion_received:
return 'Invocation payload received after invocation completion!'
elif not state.invocation_payloads_in_flight:
return 'Invocation payload "%s" received but not in flight!' % (payload,)
elif state.invocation_payloads_in_flight[0] != payload:
return 'Invocation payload mismatch: %s, %s' % (
state.invocation_payloads_in_flight[0], payload)
elif state.service_side_invocation_allowance < 1:
return 'Disallowed invocation payload!'
else:
state.invocation_payloads_in_flight.pop(0)
state.invocation_payloads_received += 1
state.service_side_invocation_allowance -= 1
if completion is not None:
if state.invocation_completion_received:
return 'Later invocation completion received: %s' % (completion,)
elif not implementation.completion_transmitted(
state.invocation_completion_in_flight, completion):
return 'Invocation completion maltransmitted: %s, %s' % (
state.invocation_completion_in_flight, completion)
else:
state.invocation_completion_in_flight = None
state.invocation_completion_received = True
if allowance is not None:
if allowance <= 0:
return 'Illegal allowance value: %s' % (allowance,)
else:
state.service_allowance_in_flight -= allowance
state.service_side_service_allowance += allowance
def _verify_invocation_advance_and_update_state(
initial_metadata, payload, completion, allowance, state, implementation):
if initial_metadata is not None:
if state.service_initial_metadata_received:
return 'Later service initial metadata received: %s' % (initial_metadata,)
if state.service_payloads_received:
return 'Service initial metadata received after service payloads: %s' % (
state.service_payloads_received)
if state.service_completion_received:
return 'Service initial metadata received after service completion!'
if not implementation.metadata_transmitted(
state.service_initial_metadata_in_flight, initial_metadata):
return 'Service initial metadata maltransmitted: %s, %s' % (
state.service_initial_metadata_in_flight, initial_metadata)
else:
state.service_initial_metadata_in_flight = None
state.service_initial_metadata_received = True
if payload is not None:
if state.service_completion_received:
return 'Service payload received after service completion!'
elif not state.service_payloads_in_flight:
return 'Service payload "%s" received but not in flight!' % (payload,)
elif state.service_payloads_in_flight[0] != payload:
return 'Service payload mismatch: %s, %s' % (
state.invocation_payloads_in_flight[0], payload)
elif state.invocation_side_service_allowance < 1:
return 'Disallowed service payload!'
else:
state.service_payloads_in_flight.pop(0)
state.service_payloads_received += 1
state.invocation_side_service_allowance -= 1
if completion is not None:
if state.service_completion_received:
return 'Later service completion received: %s' % (completion,)
elif not implementation.completion_transmitted(
state.service_completion_in_flight, completion):
return 'Service completion maltransmitted: %s, %s' % (
state.service_completion_in_flight, completion)
else:
state.service_completion_in_flight = None
state.service_completion_received = True
if allowance is not None:
if allowance <= 0:
return 'Illegal allowance value: %s' % (allowance,)
else:
state.invocation_allowance_in_flight -= allowance
state.invocation_side_service_allowance += allowance
class Invocation(
collections.namedtuple(
'Invocation',
('group', 'method', 'subscription_kind', 'timeout', 'initial_metadata',
'payload', 'completion',))):
"""A description of operation invocation.
Attributes:
group: The group identifier for the operation.
method: The method identifier for the operation.
subscription_kind: A base.Subscription.Kind value describing the kind of
subscription to use for the operation.
timeout: A duration in seconds to pass as the timeout value for the
operation.
initial_metadata: An object to pass as the initial metadata for the
operation or None.
payload: An object to pass as a payload value for the operation or None.
completion: An object to pass as a completion value for the operation or
None.
"""
class OnAdvance(
collections.namedtuple(
'OnAdvance',
('kind', 'initial_metadata', 'payload', 'completion', 'allowance'))):
"""Describes action to be taken in a test in response to an advance call.
Attributes:
kind: A Kind value describing the overall kind of response.
initial_metadata: An initial metadata value to pass to a call of the advance
method of the operator under test. Only valid if kind is Kind.ADVANCE and
may be None.
payload: A payload value to pass to a call of the advance method of the
operator under test. Only valid if kind is Kind.ADVANCE and may be None.
completion: A base.Completion value to pass to a call of the advance method
of the operator under test. Only valid if kind is Kind.ADVANCE and may be
None.
allowance: An allowance value to pass to a call of the advance method of the
operator under test. Only valid if kind is Kind.ADVANCE and may be None.
"""
@enum.unique
class Kind(enum.Enum):
ADVANCE = 'advance'
DEFECT = 'defect'
IDLE = 'idle'
_DEFECT_ON_ADVANCE = OnAdvance(OnAdvance.Kind.DEFECT, None, None, None, None)
_IDLE_ON_ADVANCE = OnAdvance(OnAdvance.Kind.IDLE, None, None, None, None)
class Instruction(
collections.namedtuple(
'Instruction',
('kind', 'advance_args', 'advance_kwargs', 'conclude_success',
'conclude_message', 'conclude_invocation_outcome',
'conclude_service_outcome',))):
""""""
@enum.unique
class Kind(enum.Enum):
ADVANCE = 'ADVANCE'
CANCEL = 'CANCEL'
CONCLUDE = 'CONCLUDE'
class Controller(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def failed(self, message):
""""""
raise NotImplementedError()
@abc.abstractmethod
def serialize_request(self, request):
""""""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_request(self, serialized_request):
""""""
raise NotImplementedError()
@abc.abstractmethod
def serialize_response(self, response):
""""""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_response(self, serialized_response):
""""""
raise NotImplementedError()
@abc.abstractmethod
def invocation(self):
""""""
raise NotImplementedError()
@abc.abstractmethod
def poll(self):
""""""
raise NotImplementedError()
@abc.abstractmethod
def on_service_advance(
self, initial_metadata, payload, completion, allowance):
""""""
raise NotImplementedError()
@abc.abstractmethod
def on_invocation_advance(
self, initial_metadata, payload, completion, allowance):
""""""
raise NotImplementedError()
@abc.abstractmethod
def service_on_termination(self, outcome):
""""""
raise NotImplementedError()
@abc.abstractmethod
def invocation_on_termination(self, outcome):
""""""
raise NotImplementedError()
class ControllerCreator(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def name(self):
""""""
raise NotImplementedError()
@abc.abstractmethod
def controller(self, implementation, randomness):
""""""
raise NotImplementedError()
class _Remainder(
collections.namedtuple(
'_Remainder',
('invocation_payloads', 'service_payloads', 'invocation_completion',
'service_completion',))):
"""Describes work remaining to be done in a portion of a test.
Attributes:
invocation_payloads: The number of payloads to be sent from the invocation
side of the operation to the service side of the operation.
service_payloads: The number of payloads to be sent from the service side of
the operation to the invocation side of the operation.
invocation_completion: Whether or not completion from the invocation side of
the operation should be indicated and has yet to be indicated.
service_completion: Whether or not completion from the service side of the
operation should be indicated and has yet to be indicated.
"""
class _SequenceController(Controller):
def __init__(self, sequence, implementation, randomness):
"""Constructor.
Args:
sequence: A _sequence.Sequence describing the steps to be taken in the
test at a relatively high level.
implementation: A test_interfaces.Implementation encapsulating the
base interface implementation that is the system under test.
randomness: A random.Random instance for use in the test.
"""
self._condition = threading.Condition()
self._sequence = sequence
self._implementation = implementation
self._randomness = randomness
self._until = None
self._remaining_elements = None
self._poll_next = None
self._message = None
self._state = _state.OperationState()
self._todo = None
# called with self._condition
def _failed(self, message):
self._message = message
self._condition.notify_all()
def _passed(self, invocation_outcome, service_outcome):
self._poll_next = Instruction(
Instruction.Kind.CONCLUDE, None, None, True, None, invocation_outcome,
service_outcome)
self._condition.notify_all()
def failed(self, message):
with self._condition:
self._failed(message)
def serialize_request(self, request):
return request + request
def deserialize_request(self, serialized_request):
return serialized_request[:len(serialized_request) / 2]
def serialize_response(self, response):
return response * 3
def deserialize_response(self, serialized_response):
return serialized_response[2 * len(serialized_response) / 3:]
def invocation(self):
with self._condition:
self._until = time.time() + self._sequence.maximum_duration
self._remaining_elements = list(self._sequence.elements)
if self._sequence.invocation.initial_metadata:
initial_metadata = self._implementation.invocation_initial_metadata()
self._state.invocation_initial_metadata_in_flight = initial_metadata
else:
initial_metadata = None
if self._sequence.invocation.payload:
payload = _create_payload(self._randomness)
self._state.invocation_payloads_in_flight.append(payload)
else:
payload = None
if self._sequence.invocation.complete:
completion = self._implementation.invocation_completion()
self._state.invocation_completion_in_flight = completion
else:
completion = None
return Invocation(
_GROUP, _METHOD, base.Subscription.Kind.FULL,
self._sequence.invocation.timeout, initial_metadata, payload,
completion)
def poll(self):
with self._condition:
while True:
if self._message is not None:
return Instruction(
Instruction.Kind.CONCLUDE, None, None, False, self._message, None,
None)
elif self._poll_next:
poll_next = self._poll_next
self._poll_next = None
return poll_next
elif self._until < time.time():
return Instruction(
Instruction.Kind.CONCLUDE, None, None, False,
'overran allotted time!', None, None)
else:
self._condition.wait(timeout=self._until-time.time())
def on_service_advance(
self, initial_metadata, payload, completion, allowance):
with self._condition:
message = _verify_service_advance_and_update_state(
initial_metadata, payload, completion, allowance, self._state,
self._implementation)
if message is not None:
self._failed(message)
if self._todo is not None:
raise ValueError('TODO!!!')
elif _anything_in_flight(self._state):
return _IDLE_ON_ADVANCE
elif self._remaining_elements:
element = self._remaining_elements.pop(0)
if element.kind is _sequence.Element.Kind.SERVICE_TRANSMISSION:
if element.transmission.initial_metadata:
initial_metadata = self._implementation.service_initial_metadata()
self._state.service_initial_metadata_in_flight = initial_metadata
else:
initial_metadata = None
if element.transmission.payload:
payload = _create_payload(self._randomness)
self._state.service_payloads_in_flight.append(payload)
self._state.service_side_service_allowance -= 1
else:
payload = None
if element.transmission.complete:
completion = self._implementation.service_completion()
self._state.service_completion_in_flight = completion
else:
completion = None
if (not self._state.invocation_completion_received and
0 <= self._state.service_side_invocation_allowance):
allowance = 1
self._state.service_side_invocation_allowance += 1
self._state.invocation_allowance_in_flight += 1
else:
allowance = None
return OnAdvance(
OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
allowance)
else:
raise ValueError('TODO!!!')
else:
return _IDLE_ON_ADVANCE
def on_invocation_advance(
self, initial_metadata, payload, completion, allowance):
with self._condition:
message = _verify_invocation_advance_and_update_state(
initial_metadata, payload, completion, allowance, self._state,
self._implementation)
if message is not None:
self._failed(message)
if self._todo is not None:
raise ValueError('TODO!!!')
elif _anything_in_flight(self._state):
return _IDLE_ON_ADVANCE
elif self._remaining_elements:
element = self._remaining_elements.pop(0)
if element.kind is _sequence.Element.Kind.INVOCATION_TRANSMISSION:
if element.transmission.initial_metadata:
initial_metadata = self._implementation.invocation_initial_metadata()
self._state.invocation_initial_metadata_in_fight = initial_metadata
else:
initial_metadata = None
if element.transmission.payload:
payload = _create_payload(self._randomness)
self._state.invocation_payloads_in_flight.append(payload)
self._state.invocation_side_invocation_allowance -= 1
else:
payload = None
if element.transmission.complete:
completion = self._implementation.invocation_completion()
self._state.invocation_completion_in_flight = completion
else:
completion = None
if (not self._state.service_completion_received and
0 <= self._state.invocation_side_service_allowance):
allowance = 1
self._state.invocation_side_service_allowance += 1
self._state.service_allowance_in_flight += 1
else:
allowance = None
return OnAdvance(
OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
allowance)
else:
raise ValueError('TODO!!!')
else:
return _IDLE_ON_ADVANCE
def service_on_termination(self, outcome):
with self._condition:
self._state.service_side_outcome = outcome
if self._todo is not None or self._remaining_elements:
self._failed('Premature service-side outcome %s!' % (outcome,))
elif outcome is not self._sequence.outcome.service:
self._failed(
'Incorrect service-side outcome: %s should have been %s' % (
outcome, self._sequence.outcome.service))
elif self._state.invocation_side_outcome is not None:
self._passed(self._state.invocation_side_outcome, outcome)
def invocation_on_termination(self, outcome):
with self._condition:
self._state.invocation_side_outcome = outcome
if self._todo is not None or self._remaining_elements:
self._failed('Premature invocation-side outcome %s!' % (outcome,))
elif outcome is not self._sequence.outcome.invocation:
self._failed(
'Incorrect invocation-side outcome: %s should have been %s' % (
outcome, self._sequence.outcome.invocation))
elif self._state.service_side_outcome is not None:
self._passed(outcome, self._state.service_side_outcome)
class _SequenceControllerCreator(ControllerCreator):
def __init__(self, sequence):
self._sequence = sequence
def name(self):
return self._sequence.name
def controller(self, implementation, randomness):
return _SequenceController(self._sequence, implementation, randomness)
CONTROLLER_CREATORS = tuple(
_SequenceControllerCreator(sequence) for sequence in _sequence.SEQUENCES)

@ -0,0 +1,168 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Part of the tests of the base interface of RPC Framework."""
import collections
import enum
from grpc.framework.interfaces.base import base
from grpc_test.framework.common import test_constants
class Invocation(
collections.namedtuple(
'Invocation', ('timeout', 'initial_metadata', 'payload', 'complete',))):
"""A recipe for operation invocation.
Attributes:
timeout: A duration in seconds to pass to the system under test as the
operation's timeout value.
initial_metadata: A boolean indicating whether or not to pass initial
metadata when invoking the operation.
payload: A boolean indicating whether or not to pass a payload when
invoking the operation.
complete: A boolean indicating whether or not to indicate completion of
transmissions from the invoking side of the operation when invoking the
operation.
"""
class Transmission(
collections.namedtuple(
'Transmission', ('initial_metadata', 'payload', 'complete',))):
"""A recipe for a single transmission in an operation.
Attributes:
initial_metadata: A boolean indicating whether or not to pass initial
metadata as part of the transmission.
payload: A boolean indicating whether or not to pass a payload as part of
the transmission.
complete: A boolean indicating whether or not to indicate completion of
transmission from the transmitting side of the operation as part of the
transmission.
"""
class Intertransmission(
collections.namedtuple('Intertransmission', ('invocation', 'service',))):
"""A recipe for multiple transmissions in an operation.
Attributes:
invocation: An integer describing the number of payloads to send from the
invocation side of the operation to the service side.
service: An integer describing the number of payloads to send from the
service side of the operation to the invocation side.
"""
class Element(collections.namedtuple('Element', ('kind', 'transmission',))):
"""A sum type for steps to perform when testing an operation.
Attributes:
kind: A Kind value describing the kind of step to perform in the test.
transmission: Only valid for kinds Kind.INVOCATION_TRANSMISSION and
Kind.SERVICE_TRANSMISSION, a Transmission value describing the details of
the transmission to be made.
"""
@enum.unique
class Kind(enum.Enum):
INVOCATION_TRANSMISSION = 'invocation transmission'
SERVICE_TRANSMISSION = 'service transmission'
INTERTRANSMISSION = 'intertransmission'
INVOCATION_CANCEL = 'invocation cancel'
SERVICE_CANCEL = 'service cancel'
INVOCATION_FAILURE = 'invocation failure'
SERVICE_FAILURE = 'service failure'
class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))):
"""A description of the expected outcome of an operation test.
Attributes:
invocation: The base.Outcome value expected on the invocation side of the
operation.
service: The base.Outcome value expected on the service side of the
operation.
"""
class Sequence(
collections.namedtuple(
'Sequence',
('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))):
"""Describes at a high level steps to perform in a test.
Attributes:
name: The string name of the sequence.
maximum_duration: A length of time in seconds to allow for the test before
declaring it to have failed.
invocation: An Invocation value describing how to invoke the operation
under test.
elements: A sequence of Element values describing at coarse granularity
actions to take during the operation under test.
outcome: An Outcome value describing the expected outcome of the test.
"""
_EASY = Sequence(
'Easy',
test_constants.TIME_ALLOWANCE,
Invocation(test_constants.LONG_TIMEOUT, True, True, True),
(
Element(
Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)),
),
Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
_PEASY = Sequence(
'Peasy',
test_constants.TIME_ALLOWANCE,
Invocation(test_constants.LONG_TIMEOUT, True, True, False),
(
Element(
Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, False)),
Element(
Element.Kind.INVOCATION_TRANSMISSION,
Transmission(False, True, True)),
Element(
Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)),
),
Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
# TODO(issue 2959): Finish this test suite. This tuple of sequences should
# contain at least the values in the Cartesian product of (half-duplex,
# full-duplex) * (zero payloads, one payload, test_constants.STREAM_LENGTH
# payloads) * (completion, cancellation, expiration, programming defect in
# servicer code).
SEQUENCES = (
_EASY,
_PEASY,
)

@ -0,0 +1,55 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Part of the tests of the base interface of RPC Framework."""
class OperationState(object):
def __init__(self):
self.invocation_initial_metadata_in_flight = None
self.invocation_initial_metadata_received = False
self.invocation_payloads_in_flight = []
self.invocation_payloads_received = 0
self.invocation_completion_in_flight = None
self.invocation_completion_received = False
self.service_initial_metadata_in_flight = None
self.service_initial_metadata_received = False
self.service_payloads_in_flight = []
self.service_payloads_received = 0
self.service_completion_in_flight = None
self.service_completion_received = False
self.invocation_side_invocation_allowance = 1
self.invocation_side_service_allowance = 1
self.service_side_invocation_allowance = 1
self.service_side_service_allowance = 1
self.invocation_allowance_in_flight = 0
self.service_allowance_in_flight = 0
self.invocation_side_outcome = None
self.service_side_outcome = None

@ -0,0 +1,260 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the base interface of RPC Framework."""
import logging
import random
import threading
import time
import unittest
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.base import utilities
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import _control
from grpc_test.framework.interfaces.base import test_interfaces
_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome}
class _Serialization(test_interfaces.Serialization):
def serialize_request(self, request):
return request + request
def deserialize_request(self, serialized_request):
return serialized_request[:len(serialized_request) / 2]
def serialize_response(self, response):
return response * 3
def deserialize_response(self, serialized_response):
return serialized_response[2 * len(serialized_response) / 3:]
def _advance(quadruples, operator, controller):
try:
for quadruple in quadruples:
operator.advance(
initial_metadata=quadruple[0], payload=quadruple[1],
completion=quadruple[2], allowance=quadruple[3])
except Exception as e: # pylint: disable=broad-except
controller.failed('Exception on advance: %e' % e)
class _Operator(base.Operator):
def __init__(self, controller, on_advance, pool, operator_under_test):
self._condition = threading.Condition()
self._controller = controller
self._on_advance = on_advance
self._pool = pool
self._operator_under_test = operator_under_test
self._pending_advances = []
def set_operator_under_test(self, operator_under_test):
with self._condition:
self._operator_under_test = operator_under_test
pent_advances = self._pending_advances
self._pending_advances = []
pool = self._pool
controller = self._controller
if pool is None:
_advance(pent_advances, operator_under_test, controller)
else:
pool.submit(_advance, pent_advances, operator_under_test, controller)
def advance(
self, initial_metadata=None, payload=None, completion=None,
allowance=None):
on_advance = self._on_advance(
initial_metadata, payload, completion, allowance)
if on_advance.kind is _control.OnAdvance.Kind.ADVANCE:
with self._condition:
pool = self._pool
operator_under_test = self._operator_under_test
controller = self._controller
quadruple = (
on_advance.initial_metadata, on_advance.payload,
on_advance.completion, on_advance.allowance)
if pool is None:
_advance((quadruple,), operator_under_test, controller)
else:
pool.submit(_advance, (quadruple,), operator_under_test, controller)
elif on_advance.kind is _control.OnAdvance.Kind.DEFECT:
raise ValueError(
'Deliberately raised exception from Operator.advance (in a test)!')
class _Servicer(base.Servicer):
"""An base.Servicer with instrumented for testing."""
def __init__(self, group, method, controllers, pool):
self._condition = threading.Condition()
self._group = group
self._method = method
self._pool = pool
self._controllers = list(controllers)
def service(self, group, method, context, output_operator):
with self._condition:
controller = self._controllers.pop(0)
if group != self._group or method != self._method:
controller.fail(
'%s != %s or %s != %s' % (group, self._group, method, self._method))
raise base.NoSuchMethodError()
else:
operator = _Operator(
controller, controller.on_service_advance, self._pool,
output_operator)
outcome = context.add_termination_callback(
controller.service_on_termination)
if outcome is not None:
controller.service_on_termination(outcome)
return utilities.full_subscription(operator)
class _OperationTest(unittest.TestCase):
def setUp(self):
if self._synchronicity_variation:
self._pool = logging_pool.pool(test_constants.POOL_SIZE)
else:
self._pool = None
self._controller = self._controller_creator.controller(
self._implementation, self._randomness)
def tearDown(self):
if self._synchronicity_variation:
self._pool.shutdown(wait=True)
else:
self._pool = None
def test_operation(self):
invocation = self._controller.invocation()
if invocation.subscription_kind is base.Subscription.Kind.FULL:
test_operator = _Operator(
self._controller, self._controller.on_invocation_advance,
self._pool, None)
subscription = utilities.full_subscription(test_operator)
else:
# TODO(nathaniel): support and test other subscription kinds.
self.fail('Non-full subscriptions not yet supported!')
servicer = _Servicer(
invocation.group, invocation.method, (self._controller,), self._pool)
invocation_end, service_end, memo = self._implementation.instantiate(
{(invocation.group, invocation.method): _Serialization()}, servicer)
try:
invocation_end.start()
service_end.start()
operation_context, operator_under_test = invocation_end.operate(
invocation.group, invocation.method, subscription, invocation.timeout,
initial_metadata=invocation.initial_metadata, payload=invocation.payload,
completion=invocation.completion)
test_operator.set_operator_under_test(operator_under_test)
outcome = operation_context.add_termination_callback(
self._controller.invocation_on_termination)
if outcome is not None:
self._controller.invocation_on_termination(outcome)
except Exception as e: # pylint: disable=broad-except
self._controller.failed('Exception on invocation: %s' % e)
self.fail(e)
while True:
instruction = self._controller.poll()
if instruction.kind is _control.Instruction.Kind.ADVANCE:
try:
test_operator.advance(
*instruction.advance_args, **instruction.advance_kwargs)
except Exception as e: # pylint: disable=broad-except
self._controller.failed('Exception on instructed advance: %s' % e)
elif instruction.kind is _control.Instruction.Kind.CANCEL:
try:
operation_context.cancel()
except Exception as e: # pylint: disable=broad-except
self._controller.failed('Exception on cancel: %s' % e)
elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
break
invocation_end.stop_gracefully()
service_end.stop_gracefully()
invocation_stats = invocation_end.operation_stats()
service_stats = service_end.operation_stats()
self._implementation.destantiate(memo)
self.assertTrue(
instruction.conclude_success, msg=instruction.conclude_message)
expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT)
expected_invocation_stats[instruction.conclude_invocation_outcome] += 1
self.assertDictEqual(expected_invocation_stats, invocation_stats)
expected_service_stats = dict(_EMPTY_OUTCOME_DICT)
expected_service_stats[instruction.conclude_service_outcome] += 1
self.assertDictEqual(expected_service_stats, service_stats)
def test_cases(implementation):
"""Creates unittest.TestCase classes for a given Base implementation.
Args:
implementation: A test_interfaces.Implementation specifying creation and
destruction of the Base implementation under test.
Returns:
A sequence of subclasses of unittest.TestCase defining tests of the
specified Base layer implementation.
"""
random_seed = hash(time.time())
logging.warning('Random seed for this execution: %s', random_seed)
randomness = random.Random(x=random_seed)
test_case_classes = []
for synchronicity_variation in _SYNCHRONICITY_VARIATION:
for controller_creator in _control.CONTROLLER_CREATORS:
name = ''.join(
(synchronicity_variation[0], controller_creator.name(), 'Test',))
test_case_classes.append(
type(name, (_OperationTest,),
{'_implementation': implementation,
'_randomness': randomness,
'_synchronicity_variation': synchronicity_variation[1],
'_controller_creator': controller_creator,
}))
return test_case_classes

@ -0,0 +1,186 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces used in tests of implementations of the Base layer."""
import abc
from grpc.framework.interfaces.base import base # pylint: disable=unused-import
class Serialization(object):
"""Specifies serialization and deserialization of test payloads."""
__metaclass__ = abc.ABCMeta
def serialize_request(self, request):
"""Serializes a request value used in a test.
Args:
request: A request value created by a test.
Returns:
A bytestring that is the serialization of the given request.
"""
raise NotImplementedError()
def deserialize_request(self, serialized_request):
"""Deserializes a request value used in a test.
Args:
serialized_request: A bytestring that is the serialization of some request
used in a test.
Returns:
The request value encoded by the given bytestring.
"""
raise NotImplementedError()
def serialize_response(self, response):
"""Serializes a response value used in a test.
Args:
response: A response value created by a test.
Returns:
A bytestring that is the serialization of the given response.
"""
raise NotImplementedError()
def deserialize_response(self, serialized_response):
"""Deserializes a response value used in a test.
Args:
serialized_response: A bytestring that is the serialization of some
response used in a test.
Returns:
The response value encoded by the given bytestring.
"""
raise NotImplementedError()
class Implementation(object):
"""Specifies an implementation of the Base layer."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def instantiate(self, serializations, servicer):
"""Instantiates the Base layer implementation to be used in a test.
Args:
serializations: A dict from group-method pair to Serialization object
specifying how to serialize and deserialize payload values used in the
test.
servicer: A base.Servicer object to be called to service RPCs made during
the test.
Returns:
A sequence of length three the first element of which is a
base.End to be used to invoke RPCs, the second element of which is a
base.End to be used to service invoked RPCs, and the third element of
which is an arbitrary memo object to be kept and passed to destantiate
at the conclusion of the test.
"""
raise NotImplementedError()
@abc.abstractmethod
def destantiate(self, memo):
"""Destroys the Base layer implementation under test.
Args:
memo: The object from the third position of the return value of a call to
instantiate.
"""
raise NotImplementedError()
@abc.abstractmethod
def invocation_initial_metadata(self):
"""Provides an operation's invocation-side initial metadata.
Returns:
A value to use for an operation's invocation-side initial metadata, or
None.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_initial_metadata(self):
"""Provices an operation's service-side initial metadata.
Returns:
A value to use for an operation's service-side initial metadata, or
None.
"""
raise NotImplementedError()
@abc.abstractmethod
def invocation_completion(self):
"""Provides an operation's invocation-side completion.
Returns:
A base.Completion to use for an operation's invocation-side completion.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_completion(self):
"""Provides an operation's service-side completion.
Returns:
A base.Completion to use for an operation's service-side completion.
"""
raise NotImplementedError()
@abc.abstractmethod
def metadata_transmitted(self, original_metadata, transmitted_metadata):
"""Identifies whether or not metadata was properly transmitted.
Args:
original_metadata: A metadata value passed to the system under test.
transmitted_metadata: The same metadata value after having been
transmitted through the system under test.
Returns:
Whether or not the metadata was properly transmitted.
"""
raise NotImplementedError()
@abc.abstractmethod
def completion_transmitted(self, original_completion, transmitted_completion):
"""Identifies whether or not a base.Completion was properly transmitted.
Args:
original_completion: A base.Completion passed to the system under test.
transmitted_completion: The same completion value after having been
transmitted through the system under test.
Returns:
Whether or not the completion was properly transmitted.
"""
raise NotImplementedError()
Loading…
Cancel
Save