Remove alpha tests

Warnings will be emitted when trying to import through alpha packages.
pull/3963/head
Masood Malekghassemi 9 years ago
parent dae3dad476
commit d292131093
  1. 5
      src/python/grpcio/grpc/early_adopter/__init__.py
  2. 7
      src/python/grpcio/grpc/framework/alpha/__init__.py
  3. 5
      src/python/grpcio/grpc/framework/base/__init__.py
  4. 5
      src/python/grpcio/grpc/framework/face/__init__.py
  5. 541
      src/python/grpcio_test/grpc_protoc_plugin/alpha_python_plugin_test.py
  6. 541
      src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
  7. 46
      src/python/grpcio_test/grpc_test/_adapter/_blocking_invocation_inline_service_test.py
  8. 46
      src/python/grpcio_test/grpc_test/_adapter/_event_invocation_synchronous_event_service_test.py
  9. 106
      src/python/grpcio_test/grpc_test/_adapter/_face_test_case.py
  10. 46
      src/python/grpcio_test/grpc_test/_adapter/_future_invocation_asynchronous_event_service_test.py
  11. 277
      src/python/grpcio_test/grpc_test/_adapter/_links_test.py
  12. 100
      src/python/grpcio_test/grpc_test/_adapter/_lonely_rear_link_test.py
  13. 80
      src/python/grpcio_test/grpc_test/_adapter/_test_links.py
  14. 30
      src/python/grpcio_test/grpc_test/early_adopter/__init__.py
  15. 180
      src/python/grpcio_test/grpc_test/early_adopter/implementations_test.py
  16. 30
      src/python/grpcio_test/grpc_test/framework/base/__init__.py
  17. 80
      src/python/grpcio_test/grpc_test/framework/base/implementations_test.py
  18. 307
      src/python/grpcio_test/grpc_test/framework/base/interfaces_test_case.py
  19. 61
      src/python/grpcio_test/grpc_test/framework/face/_test_case.py
  20. 46
      src/python/grpcio_test/grpc_test/framework/face/blocking_invocation_inline_service_test.py
  21. 46
      src/python/grpcio_test/grpc_test/framework/face/event_invocation_synchronous_event_service_test.py
  22. 46
      src/python/grpcio_test/grpc_test/framework/face/future_invocation_asynchronous_event_service_test.py
  23. 70
      src/python/grpcio_test/grpc_test/framework/face/testing/serial.py

@ -27,4 +27,9 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn('the alpha API (includes this package) is deprecated, '
'unmaintained, and no longer tested. Please migrate to the beta '
'API.', DeprecationWarning, stacklevel=2)

@ -26,3 +26,10 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn('the alpha API (includes this package) is deprecated, '
'unmaintained, and no longer tested. Please migrate to the beta '
'API.', DeprecationWarning, stacklevel=2)

@ -27,4 +27,9 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn('the alpha API (includes this package) is deprecated, '
'unmaintained, and no longer tested. Please migrate to the beta '
'API.', DeprecationWarning, stacklevel=2)

@ -27,4 +27,9 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn('the alpha API (includes this package) is deprecated, '
'unmaintained, and no longer tested. Please migrate to the beta '
'API.', DeprecationWarning, stacklevel=2)

@ -1,541 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import contextlib
import distutils.spawn
import errno
import itertools
import os
import pkg_resources
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from grpc.framework.alpha import exceptions
from grpc.framework.foundation import future
# Identifiers of entities we expect to find in the generated module.
SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer'
SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer'
STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
# The timeout used in tests of RPCs that are supposed to expire.
SHORT_TIMEOUT = 2
# The timeout used in tests of RPCs that are not supposed to expire. The
# absurdly large value doesn't matter since no passing execution of this test
# module will ever wait the duration.
LONG_TIMEOUT = 600
NO_DELAY = 0
class _ServicerMethods(object):
def __init__(self, test_pb2, delay):
self._condition = threading.Condition()
self._delay = delay
self._paused = False
self._fail = False
self._test_pb2 = test_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
with self._condition:
self._paused = True
yield
with self._condition:
self._paused = False
self._condition.notify_all()
@contextlib.contextmanager
def fail(self): # pylint: disable=invalid-name
with self._condition:
self._fail = True
yield
with self._condition:
self._fail = False
def _control(self): # pylint: disable=invalid-name
with self._condition:
if self._fail:
raise ValueError()
while self._paused:
self._condition.wait()
time.sleep(self._delay)
def UnaryCall(self, request, unused_rpc_context):
response = self._test_pb2.SimpleResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
response = self._test_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
response.aggregated_payload_size = aggregated_payload_size
self._control()
return response
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def HalfDuplexCall(self, request_iter, unused_rpc_context):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
for response in responses:
yield response
@contextlib.contextmanager
def _CreateService(test_pb2, delay):
"""Provides a servicer backend and a stub.
The servicer is just the implementation
of the actual servicer passed to the face player of the python RPC
implementation; the two are detached.
Non-zero delay puts a delay on each call to the servicer, representative of
communication latency. Timeout is the default timeout for the stub while
waiting for the service.
Args:
test_pb2: The test_pb2 module generated by this test.
delay: Delay in seconds per response from the servicer.
Yields:
A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
the back-end of the service bound to the stub and the server and stub
are both activated and ready for use.
"""
servicer_methods = _ServicerMethods(test_pb2, delay)
class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
def StreamingOutputCall(self, request, context):
return servicer_methods.StreamingOutputCall(request, context)
def StreamingInputCall(self, request_iter, context):
return servicer_methods.StreamingInputCall(request_iter, context)
def FullDuplexCall(self, request_iter, context):
return servicer_methods.FullDuplexCall(request_iter, context)
def HalfDuplexCall(self, request_iter, context):
return servicer_methods.HalfDuplexCall(request_iter, context)
servicer = Servicer()
server = getattr(
test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0)
with server:
port = server.port()
stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)
with stub:
yield servicer_methods, stub, server
def _streaming_input_request_iterator(test_pb2):
for _ in range(3):
request = test_pb2.StreamingInputCallRequest()
request.payload.payload_type = test_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
def _streaming_output_request(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
request.response_parameters.add(size=sizes[2], interval_us=0)
return request
def _full_duplex_request_iterator(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
class PythonPluginTest(unittest.TestCase):
"""Test case for the gRPC Python protoc-plugin.
While reading these tests, remember that the futures API
(`stub.method.async()`) only gives futures for the *non-streaming* responses,
else it behaves like its blocking cousin.
"""
def setUp(self):
# Assume that the appropriate protoc and grpc_python_plugins are on the
# path.
protoc_command = 'protoc'
protoc_plugin_filename = distutils.spawn.find_executable(
'grpc_python_plugin')
test_proto_filename = pkg_resources.resource_filename(
'grpc_protoc_plugin', 'test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
# Ensure that the output directory exists.
self.outdir = tempfile.mkdtemp()
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
'-I .',
'--python_out=%s' % self.outdir,
'--python-grpc_out=%s' % self.outdir,
os.path.basename(test_proto_filename),
]
subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
cwd=os.path.dirname(test_proto_filename))
sys.path.append(self.outdir)
def tearDown(self):
try:
shutil.rmtree(self.outdir)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise
# TODO(atash): Figure out which of these tests is hanging flakily with small
# probability.
def testImportAttributes(self):
# check that we can access the generated module and its members.
import test_pb2 # pylint: disable=g-import-not-at-top
self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
import test_pb2
with _CreateService(
test_pb2, NO_DELAY) as (servicer, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
def testUnaryCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
request = test_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, timeout)
expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testUnaryCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.UnaryCall.async(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
request, 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingOutputCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingOutputCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
unused_methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
next(responses)
responses.cancel()
with self.assertRaises(future.CancelledError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this times out '
'instead of raising the proper error.')
def testStreamingOutputCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingInputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
response = stub.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
def testStreamingInputCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), timeout)
response_future.cancel()
self.assertTrue(response_future.cancelled())
with self.assertRaises(future.CancelledError):
response_future.result()
def testStreamingInputCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
responses = stub.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testFullDuplexCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testFullDuplexCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
request_iterator = _full_duplex_request_iterator(test_pb2)
responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
next(responses)
responses.cancel()
with self.assertRaises(future.CancelledError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever '
'and fix.')
def testFullDuplexCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testHalfDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), LONG_TIMEOUT)
expected_responses = methods.HalfDuplexCall(
half_duplex_request_iterator(), 'not a real RpcContext!')
for check in itertools.izip_longest(expected_responses, responses):
expected_response, response = check
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
import test_pb2 # pylint: disable=g-import-not-at-top
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
def wait(): # pylint: disable=invalid-name
# Where's Python 3's 'nonlocal' statement when you need it?
with condition:
wait_cell[0] = True
yield
with condition:
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
with wait():
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), SHORT_TIMEOUT)
# half-duplex waits for the client to send all info
with self.assertRaises(exceptions.ExpirationError):
next(responses)
if __name__ == '__main__':
os.chdir(os.path.dirname(sys.argv[0]))
unittest.main(verbosity=2)

@ -1,541 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import contextlib
import distutils.spawn
import errno
import itertools
import os
import pkg_resources
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from grpc.framework.alpha import exceptions
from grpc.framework.foundation import future
# Identifiers of entities we expect to find in the generated module.
SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer'
SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer'
STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
# The timeout used in tests of RPCs that are supposed to expire.
SHORT_TIMEOUT = 2
# The timeout used in tests of RPCs that are not supposed to expire. The
# absurdly large value doesn't matter since no passing execution of this test
# module will ever wait the duration.
LONG_TIMEOUT = 600
NO_DELAY = 0
class _ServicerMethods(object):
def __init__(self, test_pb2, delay):
self._condition = threading.Condition()
self._delay = delay
self._paused = False
self._fail = False
self._test_pb2 = test_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
with self._condition:
self._paused = True
yield
with self._condition:
self._paused = False
self._condition.notify_all()
@contextlib.contextmanager
def fail(self): # pylint: disable=invalid-name
with self._condition:
self._fail = True
yield
with self._condition:
self._fail = False
def _control(self): # pylint: disable=invalid-name
with self._condition:
if self._fail:
raise ValueError()
while self._paused:
self._condition.wait()
time.sleep(self._delay)
def UnaryCall(self, request, unused_rpc_context):
response = self._test_pb2.SimpleResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
response = self._test_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
response.aggregated_payload_size = aggregated_payload_size
self._control()
return response
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def HalfDuplexCall(self, request_iter, unused_rpc_context):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
response = self._test_pb2.StreamingOutputCallResponse()
response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
for response in responses:
yield response
@contextlib.contextmanager
def _CreateService(test_pb2, delay):
"""Provides a servicer backend and a stub.
The servicer is just the implementation
of the actual servicer passed to the face player of the python RPC
implementation; the two are detached.
Non-zero delay puts a delay on each call to the servicer, representative of
communication latency. Timeout is the default timeout for the stub while
waiting for the service.
Args:
test_pb2: The test_pb2 module generated by this test.
delay: Delay in seconds per response from the servicer.
Yields:
A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
the back-end of the service bound to the stub and the server and stub
are both activated and ready for use.
"""
servicer_methods = _ServicerMethods(test_pb2, delay)
class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
def StreamingOutputCall(self, request, context):
return servicer_methods.StreamingOutputCall(request, context)
def StreamingInputCall(self, request_iter, context):
return servicer_methods.StreamingInputCall(request_iter, context)
def FullDuplexCall(self, request_iter, context):
return servicer_methods.FullDuplexCall(request_iter, context)
def HalfDuplexCall(self, request_iter, context):
return servicer_methods.HalfDuplexCall(request_iter, context)
servicer = Servicer()
server = getattr(
test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0)
with server:
port = server.port()
stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)
with stub:
yield servicer_methods, stub, server
def _streaming_input_request_iterator(test_pb2):
for _ in range(3):
request = test_pb2.StreamingInputCallRequest()
request.payload.payload_type = test_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
def _streaming_output_request(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
request.response_parameters.add(size=sizes[2], interval_us=0)
return request
def _full_duplex_request_iterator(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
class PythonPluginTest(unittest.TestCase):
"""Test case for the gRPC Python protoc-plugin.
While reading these tests, remember that the futures API
(`stub.method.async()`) only gives futures for the *non-streaming* responses,
else it behaves like its blocking cousin.
"""
def setUp(self):
# Assume that the appropriate protoc and grpc_python_plugins are on the
# path.
protoc_command = 'protoc'
protoc_plugin_filename = distutils.spawn.find_executable(
'grpc_python_plugin')
test_proto_filename = pkg_resources.resource_filename(
'grpc_protoc_plugin', 'test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
# Ensure that the output directory exists.
self.outdir = tempfile.mkdtemp()
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
'-I .',
'--python_out=%s' % self.outdir,
'--python-grpc_out=%s' % self.outdir,
os.path.basename(test_proto_filename),
]
subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
cwd=os.path.dirname(test_proto_filename))
sys.path.append(self.outdir)
def tearDown(self):
try:
shutil.rmtree(self.outdir)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise
# TODO(atash): Figure out which of these tests is hanging flakily with small
# probability.
def testImportAttributes(self):
# check that we can access the generated module and its members.
import test_pb2 # pylint: disable=g-import-not-at-top
self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
import test_pb2
with _CreateService(
test_pb2, NO_DELAY) as (servicer, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
def testUnaryCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
request = test_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, timeout)
expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testUnaryCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.UnaryCall.async(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
request, 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingOutputCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingOutputCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
unused_methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
next(responses)
responses.cancel()
with self.assertRaises(future.CancelledError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this times out '
'instead of raising the proper error.')
def testStreamingOutputCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingInputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
response = stub.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
def testStreamingInputCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), timeout)
response_future.cancel()
self.assertTrue(response_future.cancelled())
with self.assertRaises(future.CancelledError):
response_future.result()
def testStreamingInputCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
response_future = stub.StreamingInputCall.async(
_streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
responses = stub.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testFullDuplexCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.pause():
responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testFullDuplexCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
request_iterator = _full_duplex_request_iterator(test_pb2)
responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
next(responses)
responses.cancel()
with self.assertRaises(future.CancelledError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever '
'and fix.')
def testFullDuplexCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
with methods.fail():
responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
next(responses)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testHalfDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (
methods, stub, unused_server):
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), LONG_TIMEOUT)
expected_responses = methods.HalfDuplexCall(
half_duplex_request_iterator(), 'not a real RpcContext!')
for check in itertools.izip_longest(expected_responses, responses):
expected_response, response = check
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
import test_pb2 # pylint: disable=g-import-not-at-top
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
def wait(): # pylint: disable=invalid-name
# Where's Python 3's 'nonlocal' statement when you need it?
with condition:
wait_cell[0] = True
yield
with condition:
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
with wait():
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), SHORT_TIMEOUT)
# half-duplex waits for the client to send all info
with self.assertRaises(exceptions.ExpirationError):
next(responses)
if __name__ == '__main__':
os.chdir(os.path.dirname(sys.argv[0]))
unittest.main(verbosity=2)

@ -1,46 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from grpc_test._adapter import _face_test_case
from grpc_test.framework.face.testing import blocking_invocation_inline_service_test_case as test_case
class BlockingInvocationInlineServiceTest(
_face_test_case.FaceTestCase,
test_case.BlockingInvocationInlineServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,46 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from grpc_test._adapter import _face_test_case
from grpc_test.framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case
class EventInvocationSynchronousEventServiceTest(
_face_test_case.FaceTestCase,
test_case.EventInvocationSynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,106 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Common construction and destruction for GRPC-backed Face-layer tests."""
import unittest
from grpc._adapter import fore
from grpc._adapter import rear
from grpc.framework.base import util
from grpc.framework.base import implementations as base_implementations
from grpc.framework.face import implementations as face_implementations
from grpc.framework.foundation import logging_pool
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import serial
from grpc_test.framework.face.testing import test_case
_TIMEOUT = 3
_MAXIMUM_TIMEOUT = 90
_MAXIMUM_POOL_SIZE = 4
class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
"""Provides abstract Face-layer tests a GRPC-backed implementation."""
def set_up_implementation(
self, name, methods, method_implementations,
multi_method_implementation):
pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
servicer = face_implementations.servicer(
pool, method_implementations, multi_method_implementation)
serialization = serial.serialization(methods)
fore_link = fore.ForeLink(
pool, serialization.request_deserializers,
serialization.response_serializers, None, ())
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, pool,
serialization.request_serializers,
serialization.response_deserializers, False, None, None, None)
rear_link.start()
front = base_implementations.front_link(pool, pool, pool)
back = base_implementations.back_link(
servicer, pool, pool, pool, _TIMEOUT, _MAXIMUM_TIMEOUT)
fore_link.join_rear_link(back)
back.join_fore_link(fore_link)
rear_link.join_fore_link(front)
front.join_rear_link(rear_link)
stub = face_implementations.generic_stub(front, pool)
return stub, (rear_link, fore_link, front, back)
def tear_down_implementation(self, memo):
rear_link, fore_link, front, back = memo
# TODO(nathaniel): Waiting for the front and back to idle possibly should
# not be necessary - investigate as part of graceful shutdown work.
util.wait_for_idle(front)
util.wait_for_idle(back)
rear_link.stop()
fore_link.stop()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedUnaryRequestStreamResponse(self):
raise NotImplementedError()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedStreamRequestUnaryResponse(self):
raise NotImplementedError()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedStreamRequestStreamResponse(self):
raise NotImplementedError()

@ -1,46 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from grpc_test._adapter import _face_test_case
from grpc_test.framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case
class FutureInvocationAsynchronousEventServiceTest(
_face_test_case.FaceTestCase,
test_case.FutureInvocationAsynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,277 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test of the GRPC-backed ForeLink and RearLink."""
import threading
import unittest
from grpc._adapter import fore
from grpc._adapter import rear
from grpc.framework.base import interfaces
from grpc.framework.foundation import logging_pool
from grpc_test._adapter import _proto_scenarios
from grpc_test._adapter import _test_links
_IDENTITY = lambda x: x
_TIMEOUT = 32
# TODO(nathaniel): End-to-end metadata testing.
def _transform_metadata(unused_metadata):
return (
('one_unused_key', 'one unused value'),
('another_unused_key', 'another unused value'),
)
class RoundTripTest(unittest.TestCase):
def setUp(self):
self.fore_link_pool = logging_pool.pool(8)
self.rear_link_pool = logging_pool.pool(8)
def tearDown(self):
self.rear_link_pool.shutdown(wait=True)
self.fore_link_pool.shutdown(wait=True)
def testZeroMessageRoundTrip(self):
test_operation_id = object()
test_method = 'test method'
test_fore_link = _test_links.ForeLink(None, None)
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.kind in (
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE):
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, 0,
interfaces.BackToFrontTicket.Kind.COMPLETION, None)
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: None},
{test_method: None}, False, None, None, None,
metadata_transformer=_transform_metadata)
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is
interfaces.BackToFrontTicket.Kind.CONTINUATION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_fore_link.condition:
self.assertIs(
test_fore_link.tickets[-1].kind,
interfaces.BackToFrontTicket.Kind.COMPLETION)
def testEntireRoundTrip(self):
test_operation_id = object()
test_method = 'test method'
test_front_to_back_datum = b'\x07'
test_back_to_front_datum = b'\x08'
test_fore_link = _test_links.ForeLink(None, None)
rear_sequence_number = [0]
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.payload is None:
payload = None
else:
payload = test_back_to_front_datum
terminal = front_to_back_ticket.kind in (
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE)
if payload is not None or terminal:
if terminal:
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
payload)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: _IDENTITY},
{test_method: _IDENTITY}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
{test_method: _IDENTITY}, False, None, None, None)
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_rear_link.condition:
front_to_back_payloads = tuple(
ticket.payload for ticket in test_rear_link.tickets
if ticket.payload is not None)
with test_fore_link.condition:
back_to_front_payloads = tuple(
ticket.payload for ticket in test_fore_link.tickets
if ticket.payload is not None)
self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads)
self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads)
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_method = scenario.method()
test_fore_link = _test_links.ForeLink(None, None)
rear_lock = threading.Lock()
rear_sequence_number = [0]
def rear_action(front_to_back_ticket, fore_link):
with rear_lock:
if front_to_back_ticket.payload is not None:
response = scenario.response_for_request(front_to_back_ticket.payload)
else:
response = None
terminal = front_to_back_ticket.kind in (
interfaces.FrontToBackTicket.Kind.COMPLETION,
interfaces.FrontToBackTicket.Kind.ENTIRE)
if response is not None or terminal:
if terminal:
kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
response)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: scenario.deserialize_request},
{test_method: scenario.serialize_response}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool,
{test_method: scenario.serialize_request},
{test_method: scenario.deserialize_response}, False, None, None, None)
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
commencement_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0,
interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
continuation_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None,
request, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(continuation_ticket)
completion_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None,
None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(completion_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_rear_link.condition:
requests = tuple(
ticket.payload for ticket in test_rear_link.tickets
if ticket.payload is not None)
with test_fore_link.condition:
responses = tuple(
ticket.payload for ticket in test_fore_link.tickets
if ticket.payload is not None)
self.assertTrue(scenario.verify_requests(requests))
self.assertTrue(scenario.verify_responses(responses))
def testEmptyScenario(self):
self._perform_scenario_test(_proto_scenarios.EmptyScenario())
def testBidirectionallyUnaryScenario(self):
self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
def testBidirectionallyStreamingScenario(self):
self._perform_scenario_test(
_proto_scenarios.BidirectionallyStreamingScenario())
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,100 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test of invocation-side code unconnected to an RPC server."""
import unittest
from grpc._adapter import rear
from grpc.framework.base import interfaces
from grpc.framework.foundation import logging_pool
from grpc_test._adapter import _test_links
_IDENTITY = lambda x: x
_TIMEOUT = 2
class LonelyRearLinkTest(unittest.TestCase):
def setUp(self):
self.pool = logging_pool.pool(8)
def tearDown(self):
self.pool.shutdown(wait=True)
def testUpAndDown(self):
rear_link = rear.RearLink(
'nonexistent', 54321, self.pool, {}, {}, False, None, None, None)
rear_link.start()
rear_link.stop()
def _perform_lonely_client_test_with_ticket_kind(
self, front_to_back_ticket_kind):
test_operation_id = object()
test_method = 'test method'
fore_link = _test_links.ForeLink(None, None)
rear_link = rear.RearLink(
'nonexistent', 54321, self.pool, {test_method: None},
{test_method: None}, False, None, None, None)
rear_link.join_fore_link(fore_link)
rear_link.start()
front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, front_to_back_ticket_kind, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with fore_link.condition:
while True:
if (fore_link.tickets and
fore_link.tickets[-1].kind is not
interfaces.BackToFrontTicket.Kind.CONTINUATION):
break
fore_link.condition.wait()
rear_link.stop()
with fore_link.condition:
self.assertIsNot(
fore_link.tickets[-1].kind,
interfaces.BackToFrontTicket.Kind.COMPLETION)
def testLonelyClientCommencementTicket(self):
self._perform_lonely_client_test_with_ticket_kind(
interfaces.FrontToBackTicket.Kind.COMMENCEMENT)
def testLonelyClientEntireTicket(self):
self._perform_lonely_client_test_with_ticket_kind(
interfaces.FrontToBackTicket.Kind.ENTIRE)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,80 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Links suitable for use in tests."""
import threading
from grpc.framework.base import interfaces
class ForeLink(interfaces.ForeLink):
"""A ForeLink suitable for use in tests of RearLinks."""
def __init__(self, action, rear_link):
self.condition = threading.Condition()
self.tickets = []
self.action = action
self.rear_link = rear_link
def accept_back_to_front_ticket(self, ticket):
with self.condition:
self.tickets.append(ticket)
self.condition.notify_all()
action, rear_link = self.action, self.rear_link
if action is not None:
action(ticket, rear_link)
def join_rear_link(self, rear_link):
with self.condition:
self.rear_link = rear_link
class RearLink(interfaces.RearLink):
"""A RearLink suitable for use in tests of ForeLinks."""
def __init__(self, action, fore_link):
self.condition = threading.Condition()
self.tickets = []
self.action = action
self.fore_link = fore_link
def accept_front_to_back_ticket(self, ticket):
with self.condition:
self.tickets.append(ticket)
self.condition.notify_all()
action, fore_link = self.action, self.fore_link
if action is not None:
action(ticket, fore_link)
def join_fore_link(self, fore_link):
with self.condition:
self.fore_link = fore_link

@ -1,30 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -1,180 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): Expand this test coverage.
"""Test of the GRPC-backed ForeLink and RearLink."""
import unittest
from grpc.early_adopter import implementations
from grpc.framework.alpha import utilities
from grpc_test._junkdrawer import math_pb2
SERVICE_NAME = 'math.Math'
DIV = 'Div'
DIV_MANY = 'DivMany'
FIB = 'Fib'
SUM = 'Sum'
def _fibbonacci(limit):
left, right = 0, 1
for _ in xrange(limit):
yield left
left, right = right, left + right
def _div(request, unused_context):
return math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _div_many(request_iterator, unused_context):
for request in request_iterator:
yield math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _fib(request, unused_context):
for number in _fibbonacci(request.limit):
yield math_pb2.Num(num=number)
def _sum(request_iterator, unused_context):
accumulation = 0
for request in request_iterator:
accumulation += request.num
return math_pb2.Num(num=accumulation)
_INVOCATION_DESCRIPTIONS = {
DIV: utilities.unary_unary_invocation_description(
math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString),
DIV_MANY: utilities.stream_stream_invocation_description(
math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString),
FIB: utilities.unary_stream_invocation_description(
math_pb2.FibArgs.SerializeToString, math_pb2.Num.FromString),
SUM: utilities.stream_unary_invocation_description(
math_pb2.Num.SerializeToString, math_pb2.Num.FromString),
}
_SERVICE_DESCRIPTIONS = {
DIV: utilities.unary_unary_service_description(
_div, math_pb2.DivArgs.FromString,
math_pb2.DivReply.SerializeToString),
DIV_MANY: utilities.stream_stream_service_description(
_div_many, math_pb2.DivArgs.FromString,
math_pb2.DivReply.SerializeToString),
FIB: utilities.unary_stream_service_description(
_fib, math_pb2.FibArgs.FromString, math_pb2.Num.SerializeToString),
SUM: utilities.stream_unary_service_description(
_sum, math_pb2.Num.FromString, math_pb2.Num.SerializeToString),
}
_TIMEOUT = 3
class EarlyAdopterImplementationsTest(unittest.TestCase):
def setUp(self):
self.server = implementations.server(
SERVICE_NAME, _SERVICE_DESCRIPTIONS, 0)
self.server.start()
port = self.server.port()
self.stub = implementations.stub(
SERVICE_NAME, _INVOCATION_DESCRIPTIONS, 'localhost', port)
def tearDown(self):
self.server.stop()
def testUpAndDown(self):
with self.stub:
pass
def testUnaryUnary(self):
divisor = 59
dividend = 973
expected_quotient = dividend / divisor
expected_remainder = dividend % divisor
with self.stub:
response = self.stub.Div(
math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
self.assertEqual(expected_quotient, response.quotient)
self.assertEqual(expected_remainder, response.remainder)
def testUnaryStream(self):
stream_length = 43
with self.stub:
response_iterator = self.stub.Fib(
math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
numbers = tuple(response.num for response in response_iterator)
for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
self.assertEqual(early + middle, later)
self.assertEqual(stream_length, len(numbers))
def testStreamUnary(self):
stream_length = 127
with self.stub:
response_future = self.stub.Sum.async(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
(stream_length * (stream_length - 1)) / 2,
response_future.result().num)
def testStreamStream(self):
stream_length = 179
divisor_offset = 71
dividend_offset = 1763
with self.stub:
response_iterator = self.stub.DivMany(
(math_pb2.DivArgs(
divisor=divisor_offset + index,
dividend=dividend_offset + index)
for index in range(stream_length)),
_TIMEOUT)
for index, response in enumerate(response_iterator):
self.assertEqual(
(dividend_offset + index) / (divisor_offset + index),
response.quotient)
self.assertEqual(
(dividend_offset + index) % (divisor_offset + index),
response.remainder)
self.assertEqual(stream_length, index + 1)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,30 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -1,80 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests for grpc.framework.base.implementations."""
import unittest
from grpc.framework.base import implementations
from grpc.framework.base import util
from grpc.framework.foundation import logging_pool
from grpc_test.framework.base import interfaces_test_case
POOL_MAX_WORKERS = 10
DEFAULT_TIMEOUT = 30
MAXIMUM_TIMEOUT = 60
class ImplementationsTest(
interfaces_test_case.FrontAndBackTest, unittest.TestCase):
def setUp(self):
self.memory_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.front_work_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.front_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.front_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.back_work_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.back_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.back_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.test_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.test_servicer = interfaces_test_case.TestServicer(self.test_pool)
self.front = implementations.front_link(
self.front_work_pool, self.front_transmission_pool,
self.front_utility_pool)
self.back = implementations.back_link(
self.test_servicer, self.back_work_pool, self.back_transmission_pool,
self.back_utility_pool, DEFAULT_TIMEOUT, MAXIMUM_TIMEOUT)
self.front.join_rear_link(self.back)
self.back.join_fore_link(self.front)
def tearDown(self):
util.wait_for_idle(self.back)
util.wait_for_idle(self.front)
self.memory_transmission_pool.shutdown(wait=True)
self.front_work_pool.shutdown(wait=True)
self.front_transmission_pool.shutdown(wait=True)
self.front_utility_pool.shutdown(wait=True)
self.back_work_pool.shutdown(wait=True)
self.back_transmission_pool.shutdown(wait=True)
self.back_utility_pool.shutdown(wait=True)
self.test_pool.shutdown(wait=True)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,307 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Abstract tests against the interfaces of the base layer of RPC Framework."""
import threading
import time
from grpc.framework.base import interfaces
from grpc.framework.base import util
from grpc.framework.foundation import stream
from grpc.framework.foundation import stream_util
from grpc_test.framework.foundation import stream_testing
TICK = 0.1
SMALL_TIMEOUT = TICK * 50
STREAM_LENGTH = 100
SYNCHRONOUS_ECHO = 'synchronous echo'
ASYNCHRONOUS_ECHO = 'asynchronous echo'
IMMEDIATE_FAILURE = 'immediate failure'
TRIGGERED_FAILURE = 'triggered failure'
WAIT_ON_CONDITION = 'wait on condition'
EMPTY_OUTCOME_DICT = {
interfaces.Outcome.COMPLETED: 0,
interfaces.Outcome.CANCELLED: 0,
interfaces.Outcome.EXPIRED: 0,
interfaces.Outcome.RECEPTION_FAILURE: 0,
interfaces.Outcome.TRANSMISSION_FAILURE: 0,
interfaces.Outcome.SERVICER_FAILURE: 0,
interfaces.Outcome.SERVICED_FAILURE: 0,
}
def _synchronous_echo(output_consumer):
return stream_util.TransformingConsumer(lambda x: x, output_consumer)
class AsynchronousEcho(stream.Consumer):
"""A stream.Consumer that echoes its input to another stream.Consumer."""
def __init__(self, output_consumer, pool):
self._lock = threading.Lock()
self._output_consumer = output_consumer
self._pool = pool
self._queue = []
self._spinning = False
def _spin(self, value, complete):
while True:
if value:
if complete:
self._output_consumer.consume_and_terminate(value)
else:
self._output_consumer.consume(value)
elif complete:
self._output_consumer.terminate()
with self._lock:
if self._queue:
value, complete = self._queue.pop(0)
else:
self._spinning = False
return
def consume(self, value):
with self._lock:
if self._spinning:
self._queue.append((value, False))
else:
self._spinning = True
self._pool.submit(self._spin, value, False)
def terminate(self):
with self._lock:
if self._spinning:
self._queue.append((None, True))
else:
self._spinning = True
self._pool.submit(self._spin, None, True)
def consume_and_terminate(self, value):
with self._lock:
if self._spinning:
self._queue.append((value, True))
else:
self._spinning = True
self._pool.submit(self._spin, value, True)
class TestServicer(interfaces.Servicer):
"""An interfaces.Servicer with instrumented for testing."""
def __init__(self, pool):
self._pool = pool
self.condition = threading.Condition()
self._released = False
def service(self, name, context, output_consumer):
if name == SYNCHRONOUS_ECHO:
return _synchronous_echo(output_consumer)
elif name == ASYNCHRONOUS_ECHO:
return AsynchronousEcho(output_consumer, self._pool)
elif name == IMMEDIATE_FAILURE:
raise ValueError()
elif name == TRIGGERED_FAILURE:
raise NotImplementedError
elif name == WAIT_ON_CONDITION:
with self.condition:
while not self._released:
self.condition.wait()
return _synchronous_echo(output_consumer)
else:
raise NotImplementedError()
def release(self):
with self.condition:
self._released = True
self.condition.notify_all()
class EasyServicedIngestor(interfaces.ServicedIngestor):
"""A trivial implementation of interfaces.ServicedIngestor."""
def __init__(self, consumer):
self._consumer = consumer
def consumer(self, operation_context):
"""See interfaces.ServicedIngestor.consumer for specification."""
return self._consumer
class FrontAndBackTest(object):
"""A test suite usable against any joined Front and Back."""
# Pylint doesn't know that this is a unittest.TestCase mix-in.
# pylint: disable=invalid-name
def testSimplestCall(self):
"""Tests the absolute simplest call - a one-ticket fire-and-forget."""
self.front.operate(
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID')
util.wait_for_idle(self.front)
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
# (1) The ticket is still either in the front waiting to be transmitted
# or is somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
# return with the ticket bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) The ticket arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.Outcome.COMPLETED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
# It's true that if the ticket had arrived at the back and the back had
# begun processing that wait_for_idle could hold test execution until the
# back completed the operation, but that doesn't really collapse the
# possibility space down to one solution.
def testEntireEcho(self):
"""Tests a very simple one-ticket-each-way round-trip."""
test_payload = 'test payload'
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
EasyServicedIngestor(test_consumer))
self.front.operate(
ASYNCHRONOUS_ECHO, test_payload, True, SMALL_TIMEOUT, subscription,
'test trace ID')
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertEqual(
1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self):
"""Tests sending multiple tickets each way."""
test_payload_template = 'test_payload: %03d'
test_payloads = [test_payload_template % i for i in range(STREAM_LENGTH)]
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
EasyServicedIngestor(test_consumer))
operation = self.front.operate(
SYNCHRONOUS_ECHO, None, False, SMALL_TIMEOUT, subscription,
'test trace ID')
for test_payload in test_payloads:
operation.consumer.consume(test_payload)
operation.consumer.terminate()
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertEqual(
1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual(test_payloads, test_consumer.values())
def testCancellation(self):
"""Tests cancelling a long-lived operation."""
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
EasyServicedIngestor(test_consumer))
operation = self.front.operate(
ASYNCHRONOUS_ECHO, None, False, SMALL_TIMEOUT, subscription,
'test trace ID')
operation.cancel()
util.wait_for_idle(self.front)
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.CANCELLED])
util.wait_for_idle(self.back)
self.assertListEqual([], test_consumer.calls)
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
# (1) Both tickets are still either in the front waiting to be transmitted
# or are somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
# return with the tickets bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) Both tickets arrived within SMALL_TIMEOUT of one another at the back.
# The back started processing based on the first ticket and then stopped
# upon receiving the cancellation ticket.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.Outcome.CANCELLED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
def testExpiration(self):
"""Tests that operations time out."""
timeout = TICK * 2
allowance = TICK # How much extra time to
condition = threading.Condition()
test_payload = 'test payload'
subscription = util.termination_only_serviced_subscription()
start_time = time.time()
outcome_cell = [None]
termination_time_cell = [None]
def termination_action(outcome):
with condition:
outcome_cell[0] = outcome
termination_time_cell[0] = time.time()
condition.notify()
with condition:
operation = self.front.operate(
SYNCHRONOUS_ECHO, test_payload, False, timeout, subscription,
'test trace ID')
operation.context.add_termination_callback(termination_action)
while outcome_cell[0] is None:
condition.wait()
duration = termination_time_cell[0] - start_time
self.assertLessEqual(timeout, duration)
self.assertLess(duration, timeout + allowance)
self.assertEqual(interfaces.Outcome.EXPIRED, outcome_cell[0])
util.wait_for_idle(self.front)
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.EXPIRED])
util.wait_for_idle(self.back)
self.assertLessEqual(
1, self.back.operation_stats()[interfaces.Outcome.EXPIRED])

@ -1,61 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Common lifecycle code for in-memory-ticket-exchange Face-layer tests."""
from grpc.framework.face import implementations
from grpc.framework.foundation import logging_pool
from grpc_test.framework.face.testing import base_util
from grpc_test.framework.face.testing import test_case
_TIMEOUT = 3
_MAXIMUM_POOL_SIZE = 10
class FaceTestCase(test_case.FaceTestCase):
"""Provides abstract Face-layer tests an in-memory implementation."""
def set_up_implementation(
self, name, methods, method_implementations,
multi_method_implementation):
servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
servicer = implementations.servicer(
servicer_pool, method_implementations, multi_method_implementation)
linked_pair = base_util.linked_pair(servicer, _TIMEOUT)
stub = implementations.generic_stub(linked_pair.front, stub_pool)
return stub, (servicer_pool, stub_pool, linked_pair)
def tear_down_implementation(self, memo):
servicer_pool, stub_pool, linked_pair = memo
linked_pair.shut_down()
stub_pool.shutdown(wait=True)
servicer_pool.shutdown(wait=True)

@ -1,46 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from grpc_test.framework.face import _test_case
from grpc_test.framework.face.testing import blocking_invocation_inline_service_test_case as test_case
class BlockingInvocationInlineServiceTest(
_test_case.FaceTestCase,
test_case.BlockingInvocationInlineServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,46 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from grpc_test.framework.face import _test_case
from grpc_test.framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case
class EventInvocationSynchronousEventServiceTest(
_test_case.FaceTestCase,
test_case.EventInvocationSynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,46 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from grpc_test.framework.face import _test_case
from grpc_test.framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case
class FutureInvocationAsynchronousEventServiceTest(
_test_case.FaceTestCase,
test_case.FutureInvocationAsynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,70 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utility for serialization in the context of test RPC services."""
import collections
class Serialization(
collections.namedtuple(
'_Serialization',
['request_serializers',
'request_deserializers',
'response_serializers',
'response_deserializers'])):
"""An aggregation of serialization behaviors for an RPC service.
Attributes:
request_serializers: A dict from method name to request object serializer
behavior.
request_deserializers: A dict from method name to request object
deserializer behavior.
response_serializers: A dict from method name to response object serializer
behavior.
response_deserializers: A dict from method name to response object
deserializer behavior.
"""
def serialization(methods):
"""Creates a Serialization from a sequences of interfaces.Method objects."""
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for method in methods:
name = method.name()
request_serializers[name] = method.serialize_request
request_deserializers[name] = method.deserialize_request
response_serializers[name] = method.serialize_response
response_deserializers[name] = method.deserialize_response
return Serialization(
request_serializers, request_deserializers, response_serializers,
response_deserializers)
Loading…
Cancel
Save