gRPC Python test infrastructure

(The time-related first part of it, anyway.)
pull/11901/head
Nathaniel Manista 7 years ago
parent 1c0b20dda7
commit 69b7231776
  1. 5
      .pylintrc
  2. 119
      src/python/grpcio_testing/grpc_testing/__init__.py
  3. 224
      src/python/grpcio_testing/grpc_testing/_time.py
  4. 17
      src/python/grpcio_testing/grpc_version.py
  5. 44
      src/python/grpcio_testing/setup.py
  6. 13
      src/python/grpcio_tests/tests/testing/__init__.py
  7. 165
      src/python/grpcio_tests/tests/testing/_time_test.py
  8. 2
      src/python/grpcio_tests/tests/tests.json
  9. 1
      tools/distrib/pylint_code.sh
  10. 1
      tools/distrib/yapf_code.sh
  11. 3
      tools/run_tests/helper_scripts/build_python.sh

@ -41,6 +41,11 @@ disable=
# NOTE(nathaniel): We don't write doc strings for most private code
# elements.
missing-docstring,
# NOTE(nathaniel): In numeric comparisons it is better to have the
# lesser (or lesser-or-equal-to) quantity on the left when the
# expression is true than it is to worry about which is an identifier
# and which a literal value.
misplaced-comparison-constant,
# NOTE(nathaniel): Our completely abstract interface classes don't have
# constructors.
no-init,

@ -0,0 +1,119 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Objects for use in testing gRPC Python-using application code."""
import abc
import six
import grpc
class Time(six.with_metaclass(abc.ABCMeta)):
"""A simulation of time.
Implementations needn't be connected with real time as provided by the
Python interpreter, but as long as systems under test use
RpcContext.is_active and RpcContext.time_remaining for querying RPC liveness
implementations may be used to change passage of time in tests.
"""
@abc.abstractmethod
def time(self):
"""Accesses the current test time.
Returns:
The current test time (over which this object has authority).
"""
raise NotImplementedError()
@abc.abstractmethod
def call_in(self, behavior, delay):
"""Adds a behavior to be called after some time.
Args:
behavior: A behavior to be called with no arguments.
delay: A duration of time in seconds after which to call the behavior.
Returns:
A grpc.Future with which the call of the behavior may be cancelled
before it is executed.
"""
raise NotImplementedError()
@abc.abstractmethod
def call_at(self, behavior, time):
"""Adds a behavior to be called at a specific time.
Args:
behavior: A behavior to be called with no arguments.
time: The test time at which to call the behavior.
Returns:
A grpc.Future with which the call of the behavior may be cancelled
before it is executed.
"""
raise NotImplementedError()
@abc.abstractmethod
def sleep_for(self, duration):
"""Blocks for some length of test time.
Args:
duration: A duration of test time in seconds for which to block.
"""
raise NotImplementedError()
@abc.abstractmethod
def sleep_until(self, time):
"""Blocks until some test time.
Args:
time: The test time until which to block.
"""
raise NotImplementedError()
def strict_real_time():
"""Creates a Time backed by the Python interpreter's time.
The returned instance will be "strict" with respect to callbacks
submitted to it: it will ensure that all callbacks registered to
be called at time t have been called before it describes the time
as having advanced beyond t.
Returns:
A Time backed by the "system" (Python interpreter's) time.
"""
from grpc_testing import _time
return _time.StrictRealTime()
def strict_fake_time(now):
"""Creates a Time that can be manipulated by test code.
The returned instance maintains an internal representation of time
independent of real time. This internal representation only advances
when user code calls the instance's sleep_for and sleep_until methods.
The returned instance will be "strict" with respect to callbacks
submitted to it: it will ensure that all callbacks registered to
be called at time t have been called before it describes the time
as having advanced beyond t.
Returns:
A Time that simulates the passage of time.
"""
from grpc_testing import _time
return _time.StrictFakeTime(now)

@ -0,0 +1,224 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test times."""
import collections
import logging
import threading
import time as _time
import grpc
import grpc_testing
def _call(behaviors):
for behavior in behaviors:
try:
behavior()
except Exception: # pylint: disable=broad-except
logging.exception('Exception calling behavior "%r"!', behavior)
def _call_in_thread(behaviors):
calling = threading.Thread(target=_call, args=(behaviors,))
calling.start()
# NOTE(nathaniel): Because this function is called from "strict" Time
# implementations, it blocks until after all behaviors have terminated.
calling.join()
class _State(object):
def __init__(self):
self.condition = threading.Condition()
self.times_to_behaviors = collections.defaultdict(list)
class _Delta(
collections.namedtuple('_Delta',
('mature_behaviors', 'earliest_mature_time',
'earliest_immature_time',))):
pass
def _process(state, now):
mature_behaviors = []
earliest_mature_time = None
while state.times_to_behaviors:
earliest_time = min(state.times_to_behaviors)
if earliest_time <= now:
if earliest_mature_time is None:
earliest_mature_time = earliest_time
earliest_mature_behaviors = state.times_to_behaviors.pop(
earliest_time)
mature_behaviors.extend(earliest_mature_behaviors)
else:
earliest_immature_time = earliest_time
break
else:
earliest_immature_time = None
return _Delta(mature_behaviors, earliest_mature_time,
earliest_immature_time)
class _Future(grpc.Future):
def __init__(self, state, behavior, time):
self._state = state
self._behavior = behavior
self._time = time
self._cancelled = False
def cancel(self):
with self._state.condition:
if self._cancelled:
return True
else:
behaviors_at_time = self._state.times_to_behaviors.get(
self._time)
if behaviors_at_time is None:
return False
else:
behaviors_at_time.remove(self._behavior)
if not behaviors_at_time:
self._state.times_to_behaviors.pop(self._time)
self._state.condition.notify_all()
self._cancelled = True
return True
def cancelled(self):
with self._state.condition:
return self._cancelled
def running(self):
raise NotImplementedError()
def done(self):
raise NotImplementedError()
def result(self, timeout=None):
raise NotImplementedError()
def exception(self, timeout=None):
raise NotImplementedError()
def traceback(self, timeout=None):
raise NotImplementedError()
def add_done_callback(self, fn):
raise NotImplementedError()
class StrictRealTime(grpc_testing.Time):
def __init__(self):
self._state = _State()
self._active = False
self._calling = None
def _activity(self):
while True:
with self._state.condition:
while True:
now = _time.time()
delta = _process(self._state, now)
self._state.condition.notify_all()
if delta.mature_behaviors:
self._calling = delta.earliest_mature_time
break
self._calling = None
if delta.earliest_immature_time is None:
self._active = False
return
else:
timeout = max(0, delta.earliest_immature_time - now)
self._state.condition.wait(timeout=timeout)
_call(delta.mature_behaviors)
def _ensure_called_through(self, time):
with self._state.condition:
while ((self._state.times_to_behaviors and
min(self._state.times_to_behaviors) < time) or
(self._calling is not None and self._calling < time)):
self._state.condition.wait()
def _call_at(self, behavior, time):
with self._state.condition:
self._state.times_to_behaviors[time].append(behavior)
if self._active:
self._state.condition.notify_all()
else:
activity = threading.Thread(target=self._activity)
activity.start()
self._active = True
return _Future(self._state, behavior, time)
def time(self):
return _time.time()
def call_in(self, behavior, delay):
return self._call_at(behavior, _time.time() + delay)
def call_at(self, behavior, time):
return self._call_at(behavior, time)
def sleep_for(self, duration):
time = _time.time() + duration
_time.sleep(duration)
self._ensure_called_through(time)
def sleep_until(self, time):
_time.sleep(max(0, time - _time.time()))
self._ensure_called_through(time)
class StrictFakeTime(grpc_testing.Time):
def __init__(self, time):
self._state = _State()
self._time = time
def time(self):
return self._time
def call_in(self, behavior, delay):
if delay <= 0:
_call_in_thread((behavior,))
else:
with self._state.condition:
time = self._time + delay
self._state.times_to_behaviors[time].append(behavior)
return _Future(self._state, behavior, time)
def call_at(self, behavior, time):
with self._state.condition:
if time <= self._time:
_call_in_thread((behavior,))
else:
self._state.times_to_behaviors[time].append(behavior)
return _Future(self._state, behavior, time)
def sleep_for(self, duration):
if 0 < duration:
with self._state.condition:
self._time += duration
delta = _process(self._state, self._time)
_call_in_thread(delta.mature_behaviors)
def sleep_until(self, time):
with self._state.condition:
if self._time < time:
self._time = time
delta = _process(self._state, self._time)
_call_in_thread(delta.mature_behaviors)

@ -0,0 +1,17 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
VERSION = '1.5.0.dev0'

@ -0,0 +1,44 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Setup module for gRPC Python's testing package."""
import os
import sys
import setuptools
# Ensure we're in the proper directory whether or not we're being used by pip.
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# Break import style to ensure that we can find same-directory modules.
import grpc_version
PACKAGE_DIRECTORIES = {
'': '.',
}
INSTALL_REQUIRES = ('protobuf>=3.3.0',
'grpcio>={version}'.format(version=grpc_version.VERSION),)
setuptools.setup(
name='grpcio-testing',
version=grpc_version.VERSION,
license='Apache License 2.0',
description='Testing utilities for gRPC Python',
author='The gRPC Authors',
author_email='grpc-io@googlegroups.com',
url='https://grpc.io',
package_dir=PACKAGE_DIRECTORIES,
packages=setuptools.find_packages('.'),
install_requires=INSTALL_REQUIRES)

@ -0,0 +1,13 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,165 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import threading
import time
import unittest
import grpc_testing
_QUANTUM = 0.3
_MANY = 10000
# Tests that run in real time can either wait for the scheduler to
# eventually run what needs to be run (and risk timing out) or declare
# that the scheduler didn't schedule work reasonably fast enough. We
# choose the latter for this test.
_PATHOLOGICAL_SCHEDULING = 'pathological thread scheduling!'
class _TimeNoter(object):
def __init__(self, time):
self._condition = threading.Condition()
self._time = time
self._call_times = []
def __call__(self):
with self._condition:
self._call_times.append(self._time.time())
def call_times(self):
with self._condition:
return tuple(self._call_times)
class TimeTest(object):
def test_sleep_for(self):
start_time = self._time.time()
self._time.sleep_for(_QUANTUM)
end_time = self._time.time()
self.assertLessEqual(start_time + _QUANTUM, end_time)
def test_sleep_until(self):
start_time = self._time.time()
self._time.sleep_until(start_time + _QUANTUM)
end_time = self._time.time()
self.assertLessEqual(start_time + _QUANTUM, end_time)
def test_call_in(self):
time_noter = _TimeNoter(self._time)
start_time = self._time.time()
self._time.call_in(time_noter, _QUANTUM)
self._time.sleep_for(_QUANTUM * 2)
call_times = time_noter.call_times()
self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING)
self.assertLessEqual(start_time + _QUANTUM, call_times[0])
def test_call_at(self):
time_noter = _TimeNoter(self._time)
start_time = self._time.time()
self._time.call_at(time_noter, self._time.time() + _QUANTUM)
self._time.sleep_for(_QUANTUM * 2)
call_times = time_noter.call_times()
self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING)
self.assertLessEqual(start_time + _QUANTUM, call_times[0])
def test_cancel(self):
time_noter = _TimeNoter(self._time)
future = self._time.call_in(time_noter, _QUANTUM * 2)
self._time.sleep_for(_QUANTUM)
cancelled = future.cancel()
self._time.sleep_for(_QUANTUM * 2)
call_times = time_noter.call_times()
self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING)
self.assertTrue(cancelled)
self.assertTrue(future.cancelled())
def test_many(self):
test_events = tuple(threading.Event() for _ in range(_MANY))
possibly_cancelled_futures = {}
background_noise_futures = []
for test_event in test_events:
possibly_cancelled_futures[test_event] = self._time.call_in(
test_event.set, _QUANTUM * (2 + random.random()))
for _ in range(_MANY):
background_noise_futures.append(
self._time.call_in(threading.Event().set, _QUANTUM * 1000 *
random.random()))
self._time.sleep_for(_QUANTUM)
cancelled = set()
for test_event, test_future in possibly_cancelled_futures.items():
if bool(random.randint(0, 1)) and test_future.cancel():
cancelled.add(test_event)
self._time.sleep_for(_QUANTUM * 3)
for test_event in test_events:
(self.assertFalse if test_event in cancelled else
self.assertTrue)(test_event.is_set())
for background_noise_future in background_noise_futures:
background_noise_future.cancel()
def test_same_behavior_used_several_times(self):
time_noter = _TimeNoter(self._time)
start_time = self._time.time()
first_future_at_one = self._time.call_in(time_noter, _QUANTUM)
second_future_at_one = self._time.call_in(time_noter, _QUANTUM)
first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3)
second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3)
self._time.sleep_for(_QUANTUM * 2)
first_future_at_one_cancelled = first_future_at_one.cancel()
second_future_at_one_cancelled = second_future_at_one.cancel()
first_future_at_three_cancelled = first_future_at_three.cancel()
self._time.sleep_for(_QUANTUM * 2)
second_future_at_three_cancelled = second_future_at_three.cancel()
first_future_at_three_cancelled_again = first_future_at_three.cancel()
call_times = time_noter.call_times()
self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING)
self.assertFalse(first_future_at_one_cancelled)
self.assertFalse(second_future_at_one_cancelled)
self.assertTrue(first_future_at_three_cancelled)
self.assertFalse(second_future_at_three_cancelled)
self.assertTrue(first_future_at_three_cancelled_again)
self.assertLessEqual(start_time + _QUANTUM, call_times[0])
self.assertLessEqual(start_time + _QUANTUM, call_times[1])
self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2])
class StrictRealTimeTest(TimeTest, unittest.TestCase):
def setUp(self):
self._time = grpc_testing.strict_real_time()
class StrictFakeTimeTest(TimeTest, unittest.TestCase):
def setUp(self):
self._time = grpc_testing.strict_fake_time(
random.randint(0, int(time.time())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -9,6 +9,8 @@
"protoc_plugin._split_definitions_test.SplitSeparateTest",
"protoc_plugin.beta_python_plugin_test.PythonPluginTest",
"reflection._reflection_servicer_test.ReflectionServicerTest",
"testing._time_test.StrictFakeTimeTest",
"testing._time_test.StrictRealTimeTest",
"unit._api_test.AllTest",
"unit._api_test.ChannelConnectivityTest",
"unit._api_test.ChannelTest",

@ -22,6 +22,7 @@ DIRS=(
'src/python/grpcio/grpc'
'src/python/grpcio_health_checking/grpc_health'
'src/python/grpcio_reflection/grpc_reflection'
'src/python/grpcio_testing/grpc_testing'
)
VIRTUALENV=python_pylint_venv

@ -25,6 +25,7 @@ EXCLUSIONS=(
'grpcio/grpc_*.py'
'grpcio_health_checking/grpc_*.py'
'grpcio_reflection/grpc_*.py'
'grpcio_testing/grpc_*.py'
'grpcio_tests/grpc_*.py'
)

@ -171,6 +171,9 @@ $VENV_PYTHON $ROOT/src/python/grpcio_reflection/setup.py preprocess
$VENV_PYTHON $ROOT/src/python/grpcio_reflection/setup.py build_package_protos
pip_install_dir $ROOT/src/python/grpcio_reflection
# Install testing
pip_install_dir $ROOT/src/python/grpcio_testing
# Build/install tests
$VENV_PYTHON -m pip install coverage==4.4 oauth2client==4.1.0 \
google-auth==1.0.0 requests==2.14.2

Loading…
Cancel
Save