Merge pull request #6962 from nathanielmanistaatgoogle/delete-dead-code-tests

Delete tests of dead pre-GA code
pull/6971/head
Jan Tattermusch 9 years ago committed by GitHub
commit 8d3477c9a6
  1. 30
      src/python/grpcio/tests/tests.json
  2. 429
      src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
  3. 319
      src/python/grpcio/tests/unit/_adapter/_low_test.py
  4. 157
      src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py
  5. 163
      src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py
  6. 88
      src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py
  7. 239
      src/python/grpcio/tests/unit/_links/_transmission_test.py
  8. 113
      src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py
  9. 96
      src/python/grpcio/tests/unit/framework/core/_base_interface_test.py
  10. 151
      src/python/grpcio/tests/unit/framework/foundation/_later_test.py

@ -4,10 +4,6 @@
"_api_test.ChannelTest",
"_auth_test.AccessTokenCallCredentialsTest",
"_auth_test.GoogleCallCredentialsTest",
"_base_interface_test.AsyncEasyTest",
"_base_interface_test.AsyncPeasyTest",
"_base_interface_test.SyncEasyTest",
"_base_interface_test.SyncPeasyTest",
"_beta_features_test.BetaFeaturesTest",
"_beta_features_test.ContextManagementAndLifecycleTest",
"_cancel_many_calls_test.CancelManyCallsTest",
@ -16,22 +12,6 @@
"_channel_test.ChannelTest",
"_connectivity_channel_test.ChannelConnectivityTest",
"_connectivity_channel_test.ConnectivityStatesTest",
"_core_over_links_base_interface_test.AsyncEasyTest",
"_core_over_links_base_interface_test.AsyncPeasyTest",
"_core_over_links_base_interface_test.SyncEasyTest",
"_core_over_links_base_interface_test.SyncPeasyTest",
"_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_empty_message_test.EmptyMessageTest",
"_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
@ -43,15 +23,7 @@
"_implementations_test.CallCredentialsTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
"_intermediary_low_test.CancellationTest",
"_intermediary_low_test.EchoTest",
"_intermediary_low_test.ExpirationTest",
"_intermediary_low_test.LonelyClientTest",
"_later_test.LaterTest",
"_logging_pool_test.LoggingPoolTest",
"_lonely_invocation_link_test.LonelyInvocationLinkTest",
"_low_test.HangingServerShutdown",
"_low_test.InsecureServerInsecureClient",
"_metadata_test.MetadataTest",
"_not_found_test.NotFoundTest",
"_python_plugin_test.PythonPluginTest",
@ -60,8 +32,6 @@
"_sanity_test.Sanity",
"_secure_interop_test.SecureInteropTest",
"_thread_cleanup_test.CleanupThreadTest",
"_transmission_test.RoundTripTest",
"_transmission_test.TransmissionTest",
"_utilities_test.ChannelConnectivityTest",
"beta_python_plugin_test.PythonPluginTest",
"cygrpc_test.InsecureServerInsecureClient",

@ -1,429 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests for the old '_low'."""
import threading
import time
import unittest
import six
from six.moves import queue
from grpc._adapter import _intermediary_low as _low
_STREAM_LENGTH = 300
_TIMEOUT = 5
_AFTER_DELAY = 2
_FUTURE = time.time() + 60 * 60 * 24
_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
_BYTE_SEQUENCE_SEQUENCE = tuple(
bytes(bytearray((row + column) % 256 for column in range(row)))
for row in range(_STREAM_LENGTH))
class LonelyClientTest(unittest.TestCase):
def testLonelyClient(self):
host = 'nosuchhostexists'
port = 54321
method = 'test method'
deadline = time.time() + _TIMEOUT
after_deadline = deadline + _AFTER_DELAY
metadata_tag = object()
finish_tag = object()
completion_queue = _low.CompletionQueue()
channel = _low.Channel('%s:%d' % (host, port), None)
client_call = _low.Call(channel, completion_queue, method, host, deadline)
client_call.invoke(completion_queue, metadata_tag, finish_tag)
first_event = completion_queue.get(after_deadline)
self.assertIsNotNone(first_event)
second_event = completion_queue.get(after_deadline)
self.assertIsNotNone(second_event)
kinds = [event.kind for event in (first_event, second_event)]
six.assertCountEqual(self,
(_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
kinds)
self.assertIsNone(completion_queue.get(after_deadline))
completion_queue.stop()
stop_event = completion_queue.get(_FUTURE)
self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
del client_call
del channel
del completion_queue
def _drive_completion_queue(completion_queue, event_queue):
while True:
event = completion_queue.get(_FUTURE)
if event.kind is _low.Event.Kind.STOP:
break
event_queue.put(event)
class EchoTest(unittest.TestCase):
def setUp(self):
self.host = 'localhost'
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
self.server_events = queue.Queue()
self.server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.server_completion_queue, self.server_events))
self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
self.client_events = queue.Queue()
self.client_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.client_completion_queue, self.client_events))
self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
self.server.cancel_all_calls()
self.server_completion_queue.stop()
self.client_completion_queue.stop()
self.server_completion_queue_thread.join()
self.client_completion_queue_thread.join()
del self.server
def _perform_echo_test(self, test_data):
method = 'test method'
details = 'test details'
server_leading_metadata_key = 'my_server_leading_key'
server_leading_metadata_value = 'my_server_leading_value'
server_trailing_metadata_key = 'my_server_trailing_key'
server_trailing_metadata_value = 'my_server_trailing_value'
client_metadata_key = 'my_client_key'
client_metadata_value = 'my_client_value'
server_leading_binary_metadata_key = 'my_server_leading_key-bin'
server_leading_binary_metadata_value = b'\0'*2047
server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
server_trailing_binary_metadata_value = b'\0'*2047
client_binary_metadata_key = 'my_client_key-bin'
client_binary_metadata_value = b'\0'*2047
deadline = _FUTURE
metadata_tag = object()
finish_tag = object()
write_tag = object()
complete_tag = object()
service_tag = object()
read_tag = object()
status_tag = object()
server_data = []
client_data = []
client_call = _low.Call(self.channel, self.client_completion_queue,
method, self.host, deadline)
client_call.add_metadata(client_metadata_key, client_metadata_value)
client_call.add_metadata(client_binary_metadata_key,
client_binary_metadata_value)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
service_accepted = self.server_events.get()
self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag)
self.assertEqual(method.encode(), service_accepted.service_acceptance.method)
self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host)
self.assertIsNotNone(service_accepted.service_acceptance.call)
metadata = dict(service_accepted.metadata)
self.assertIn(client_metadata_key.encode(), metadata)
self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()])
self.assertIn(client_binary_metadata_key.encode(), metadata)
self.assertEqual(client_binary_metadata_value,
metadata[client_binary_metadata_key.encode()])
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.add_metadata(server_leading_metadata_key,
server_leading_metadata_value)
server_call.add_metadata(server_leading_binary_metadata_key,
server_leading_binary_metadata_value)
server_call.premetadata()
metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
metadata = dict(metadata_accepted.metadata)
self.assertIn(server_leading_metadata_key.encode(), metadata)
self.assertEqual(server_leading_metadata_value.encode(),
metadata[server_leading_metadata_key.encode()])
self.assertIn(server_leading_binary_metadata_key.encode(), metadata)
self.assertEqual(server_leading_binary_metadata_value,
metadata[server_leading_binary_metadata_key.encode()])
for datum in test_data:
client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS)
write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
self.assertIs(write_accepted.tag, write_tag)
self.assertIs(write_accepted.write_accepted, True)
server_call.read(read_tag)
read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNotNone(read_accepted.bytes)
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag, 0)
write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
self.assertEqual(write_tag, write_accepted.tag)
self.assertTrue(write_accepted.write_accepted)
client_call.read(read_tag)
read_accepted = self.client_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNotNone(read_accepted.bytes)
client_data.append(read_accepted.bytes)
client_call.complete(complete_tag)
complete_accepted = self.client_events.get()
self.assertIsNotNone(complete_accepted)
self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
self.assertIs(complete_accepted.tag, complete_tag)
self.assertIs(complete_accepted.complete_accepted, True)
server_call.read(read_tag)
read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNone(read_accepted.bytes)
server_call.add_metadata(server_trailing_metadata_key,
server_trailing_metadata_value)
server_call.add_metadata(server_trailing_binary_metadata_key,
server_trailing_binary_metadata_value)
server_call.status(_low.Status(_low.Code.OK, details), status_tag)
server_terminal_event_one = self.server_events.get()
server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
status_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
else:
status_accepted = server_terminal_event_two
rpc_accepted = server_terminal_event_one
self.assertIsNotNone(status_accepted)
self.assertIsNotNone(rpc_accepted)
self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
self.assertEqual(status_tag, status_accepted.tag)
self.assertTrue(status_accepted.complete_accepted)
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(finish_tag, rpc_accepted.tag)
self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
client_call.read(read_tag)
client_terminal_event_one = self.client_events.get()
client_terminal_event_two = self.client_events.get()
if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = client_terminal_event_one
finish_accepted = client_terminal_event_two
else:
read_accepted = client_terminal_event_two
finish_accepted = client_terminal_event_one
self.assertIsNotNone(read_accepted)
self.assertIsNotNone(finish_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNone(read_accepted.bytes)
self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
self.assertEqual(finish_tag, finish_accepted.tag)
self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status)
metadata = dict(finish_accepted.metadata)
self.assertIn(server_trailing_metadata_key.encode(), metadata)
self.assertEqual(server_trailing_metadata_value.encode(),
metadata[server_trailing_metadata_key.encode()])
self.assertIn(server_trailing_binary_metadata_key.encode(), metadata)
self.assertEqual(server_trailing_binary_metadata_value,
metadata[server_trailing_binary_metadata_key.encode()])
self.assertSetEqual(set(key for key, _ in finish_accepted.metadata),
set((server_trailing_metadata_key.encode(),
server_trailing_binary_metadata_key.encode(),)))
self.assertSequenceEqual(test_data, server_data)
self.assertSequenceEqual(test_data, client_data)
def testNoEcho(self):
self._perform_echo_test(())
def testOneByteEcho(self):
self._perform_echo_test([b'\x07'])
def testOneManyByteEcho(self):
self._perform_echo_test([_BYTE_SEQUENCE])
def testManyOneByteEchoes(self):
self._perform_echo_test(
[_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))])
def testManyManyByteEchoes(self):
self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
class CancellationTest(unittest.TestCase):
def setUp(self):
self.host = 'localhost'
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
self.server_events = queue.Queue()
self.server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.server_completion_queue, self.server_events))
self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
self.client_events = queue.Queue()
self.client_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.client_completion_queue, self.client_events))
self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
self.server.cancel_all_calls()
self.server_completion_queue.stop()
self.client_completion_queue.stop()
self.server_completion_queue_thread.join()
self.client_completion_queue_thread.join()
del self.server
def testCancellation(self):
method = 'test method'
deadline = _FUTURE
metadata_tag = object()
finish_tag = object()
write_tag = object()
service_tag = object()
read_tag = object()
test_data = _BYTE_SEQUENCE_SEQUENCE
server_data = []
client_data = []
client_call = _low.Call(self.channel, self.client_completion_queue,
method, self.host, deadline)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
service_accepted = self.server_events.get()
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata()
metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
client_call.write(datum, write_tag, 0)
write_accepted = self.client_events.get()
server_call.read(read_tag)
read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag, 0)
write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
client_call.read(read_tag)
read_accepted = self.client_events.get()
client_data.append(read_accepted.bytes)
client_call.cancel()
# cancel() is idempotent.
client_call.cancel()
client_call.cancel()
client_call.cancel()
server_call.read(read_tag)
server_terminal_event_one = self.server_events.get()
server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
else:
read_accepted = server_terminal_event_two
rpc_accepted = server_terminal_event_one
self.assertIsNotNone(read_accepted)
self.assertIsNotNone(rpc_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertIsNone(read_accepted.bytes)
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
finish_event = self.client_events.get()
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'),
finish_event.status)
self.assertSequenceEqual(test_data, server_data)
self.assertSequenceEqual(test_data, client_data)
class ExpirationTest(unittest.TestCase):
@unittest.skip('TODO(nathaniel): Expiration test!')
def testExpiration(self):
pass
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,319 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import threading
import time
import unittest
from grpc import _grpcio_metadata
from grpc._adapter import _types
from grpc._adapter import _low
from tests.unit import test_common
def wait_for_events(completion_queues, deadline):
"""
Args:
completion_queues: list of completion queues to wait for events on
deadline: absolute deadline to wait until
Returns:
a sequence of events of length len(completion_queues).
"""
results = [None] * len(completion_queues)
lock = threading.Lock()
threads = []
def set_ith_result(i, completion_queue):
result = completion_queue.next(deadline)
with lock:
results[i] = result
for i, completion_queue in enumerate(completion_queues):
thread = threading.Thread(target=set_ith_result,
args=[i, completion_queue])
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return results
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue, [])
self.port = self.server.add_http2_port('[::]:0')
self.client_completion_queue = _low.CompletionQueue()
self.client_channel = _low.Channel('localhost:%d'%self.port, [])
self.server.start()
def tearDown(self):
self.server.shutdown()
del self.client_channel
self.client_completion_queue.shutdown()
while (self.client_completion_queue.next(float('+inf')).type !=
_types.EventType.QUEUE_SHUTDOWN):
pass
self.server_completion_queue.shutdown()
while (self.server_completion_queue.next(float('+inf')).type !=
_types.EventType.QUEUE_SHUTDOWN):
pass
del self.client_completion_queue
del self.server_completion_queue
del self.server
def testEcho(self):
deadline = time.time() + 5
event_time_tolerance = 2
deadline_tolerance = 0.25
client_metadata_ascii_key = 'key'
client_metadata_ascii_value = 'val'
client_metadata_bin_key = 'key-bin'
client_metadata_bin_value = b'\0'*1000
server_initial_metadata_key = 'init_me_me_me'
server_initial_metadata_value = 'whodawha?'
server_trailing_metadata_key = 'california_is_in_a_drought'
server_trailing_metadata_value = 'zomg it is'
server_status_code = _types.StatusCode.OK
server_status_details = 'our work is never over'
request = 'blarghaflargh'
response = 'his name is robert paulson'
method = 'twinkies'
host = 'hostess'
server_request_tag = object()
request_call_result = self.server.request_call(self.server_completion_queue,
server_request_tag)
self.assertEqual(_types.CallError.OK, request_call_result)
client_call_tag = object()
client_call = self.client_channel.create_call(
self.client_completion_queue, method, host, deadline)
client_initial_metadata = [
(client_metadata_ascii_key, client_metadata_ascii_value),
(client_metadata_bin_key, client_metadata_bin_value)
]
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata),
_types.OpArgs.send_message(request, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
_types.OpArgs.recv_status_on_client()
], client_call_tag)
self.assertEqual(_types.CallError.OK, client_start_batch_result)
client_no_event, request_event, = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
self.assertEqual(client_no_event, None)
self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEqual(1, len(request_event.results))
received_initial_metadata = request_event.results[0].initial_metadata
# Check that our metadata were transmitted
self.assertTrue(test_common.metadata_transmitted(client_initial_metadata,
received_initial_metadata))
# Check that Python's user agent string is a part of the full user agent
# string
received_initial_metadata_dict = dict(received_initial_metadata)
self.assertIn(b'user-agent', received_initial_metadata_dict)
self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(),
received_initial_metadata_dict[b'user-agent'])
self.assertEqual(method.encode(), request_event.call_details.method)
self.assertEqual(host.encode(), request_event.call_details.host)
self.assertLess(abs(deadline - request_event.call_details.deadline),
deadline_tolerance)
# Check that the channel is connected, and that both it and the call have
# the proper target and peer; do this after the first flurry of messages to
# avoid the possibility that connection was delayed by the core until the
# first message was sent.
self.assertEqual(_types.ConnectivityState.READY,
self.client_channel.check_connectivity_state(False))
self.assertIsNotNone(self.client_channel.target())
self.assertIsNotNone(client_call.peer())
server_call_tag = object()
server_call = request_event.call
server_initial_metadata = [
(server_initial_metadata_key, server_initial_metadata_value)
]
server_trailing_metadata = [
(server_trailing_metadata_key, server_trailing_metadata_value)
]
server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(),
_types.OpArgs.send_message(response, 0),
_types.OpArgs.recv_close_on_server(),
_types.OpArgs.send_status_from_server(
server_trailing_metadata, server_status_code, server_status_details)
], server_call_tag)
self.assertEqual(_types.CallError.OK, server_start_batch_result)
client_event, server_event, = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
self.assertEqual(6, len(client_event.results))
found_client_op_types = set()
for client_result in client_event.results:
# we expect each op type to be unique
self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type)
if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
self.assertTrue(
test_common.metadata_transmitted(server_initial_metadata,
client_result.initial_metadata))
elif client_result.type == _types.OpType.RECV_MESSAGE:
self.assertEqual(response.encode(), client_result.message)
elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
self.assertTrue(
test_common.metadata_transmitted(server_trailing_metadata,
client_result.trailing_metadata))
self.assertEqual(server_status_details.encode(), client_result.status.details)
self.assertEqual(server_status_code, client_result.status.code)
self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
_types.OpType.SEND_MESSAGE,
_types.OpType.SEND_CLOSE_FROM_CLIENT,
_types.OpType.RECV_INITIAL_METADATA,
_types.OpType.RECV_MESSAGE,
_types.OpType.RECV_STATUS_ON_CLIENT
]), found_client_op_types)
self.assertEqual(5, len(server_event.results))
found_server_op_types = set()
for server_result in server_event.results:
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == _types.OpType.RECV_MESSAGE:
self.assertEqual(request.encode(), server_result.message)
elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
self.assertFalse(server_result.cancelled)
self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
_types.OpType.RECV_MESSAGE,
_types.OpType.SEND_MESSAGE,
_types.OpType.RECV_CLOSE_ON_SERVER,
_types.OpType.SEND_STATUS_FROM_SERVER
]), found_server_op_types)
del client_call
del server_call
class HangingServerShutdown(unittest.TestCase):
def setUp(self):
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue, [])
self.port = self.server.add_http2_port('[::]:0')
self.client_completion_queue = _low.CompletionQueue()
self.client_channel = _low.Channel('localhost:%d'%self.port, [])
self.server.start()
def tearDown(self):
self.server.shutdown()
del self.client_channel
self.client_completion_queue.shutdown()
self.server_completion_queue.shutdown()
while True:
client_event, server_event = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
float("+inf"))
if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and
server_event.type == _types.EventType.QUEUE_SHUTDOWN):
break
del self.client_completion_queue
del self.server_completion_queue
del self.server
def testHangingServerCall(self):
deadline = time.time() + 5
deadline_tolerance = 0.25
event_time_tolerance = 2
cancel_all_calls_time_tolerance = 0.5
request = 'blarghaflargh'
method = 'twinkies'
host = 'hostess'
server_request_tag = object()
request_call_result = self.server.request_call(self.server_completion_queue,
server_request_tag)
client_call_tag = object()
client_call = self.client_channel.create_call(self.client_completion_queue,
method, host, deadline)
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata([]),
_types.OpArgs.send_message(request, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
_types.OpArgs.recv_status_on_client()
], client_call_tag)
client_no_event, request_event, = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
# Now try to shutdown the server and expect that we see server shutdown
# almost immediately after calling cancel_all_calls.
# First attempt to cancel all calls before shutting down, and expect
# our state machine to catch the erroneous API use.
with self.assertRaises(RuntimeError):
self.server.cancel_all_calls()
shutdown_tag = object()
self.server.shutdown(shutdown_tag)
pre_cancel_timestamp = time.time()
self.server.cancel_all_calls()
finish_shutdown_timestamp = None
client_call_event, server_shutdown_event = wait_for_events(
[self.client_completion_queue, self.server_completion_queue],
time.time() + event_time_tolerance)
self.assertIs(shutdown_tag, server_shutdown_event.tag)
self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance,
time.time())
del client_call
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,157 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests Base interface compliance of the core-over-gRPC-links stack."""
import collections
import logging
import random
import time
import unittest
import six
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.core import implementations
from grpc.framework.interfaces.base import utilities
from tests.unit import test_common as grpc_test_common
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.base import test_cases
from tests.unit.framework.interfaces.base import test_interfaces
class _SerializationBehaviors(
collections.namedtuple(
'_SerializationBehaviors',
('request_serializers', 'request_deserializers', 'response_serializers',
'response_deserializers',))):
pass
class _Links(
collections.namedtuple(
'_Links',
('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
'service_end_link'))):
pass
def _serialization_behaviors_from_serializations(serializations):
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for (group, method), serialization in six.iteritems(serializations):
request_serializers[group, method] = serialization.serialize_request
request_deserializers[group, method] = serialization.deserialize_request
response_serializers[group, method] = serialization.serialize_response
response_deserializers[group, method] = serialization.deserialize_response
return _SerializationBehaviors(
request_serializers, request_deserializers, response_serializers,
response_deserializers)
class _Implementation(test_interfaces.Implementation):
def instantiate(self, serializations, servicer):
serialization_behaviors = _serialization_behaviors_from_serializations(
serializations)
invocation_end_link = implementations.invocation_end_link()
service_end_link = implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost', None,
serialization_behaviors.request_serializers,
serialization_behaviors.response_deserializers)
invocation_end_link.join_link(invocation_grpc_link)
invocation_grpc_link.join_link(invocation_end_link)
service_end_link.join_link(service_grpc_link)
service_grpc_link.join_link(service_end_link)
invocation_grpc_link.start()
service_grpc_link.start()
return invocation_end_link, service_end_link, (
invocation_grpc_link, service_grpc_link)
def destantiate(self, memo):
invocation_grpc_link, service_grpc_link = memo
invocation_grpc_link.stop()
service_grpc_link.begin_stop()
service_grpc_link.end_stop()
def invocation_initial_metadata(self):
return grpc_test_common.INVOCATION_INITIAL_METADATA
def service_initial_metadata(self):
return grpc_test_common.SERVICE_INITIAL_METADATA
def invocation_completion(self):
return utilities.completion(None, None, None)
def service_completion(self):
return utilities.completion(
grpc_test_common.SERVICE_TERMINAL_METADATA,
beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
original_metadata, transmitted_metadata)
def completion_transmitted(self, original_completion, transmitted_completion):
if (original_completion.terminal_metadata is not None and
not grpc_test_common.metadata_transmitted(
original_completion.terminal_metadata,
transmitted_completion.terminal_metadata)):
return False
elif original_completion.code is not transmitted_completion.code:
return False
elif original_completion.message != transmitted_completion.message:
return False
else:
return True
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,163 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests Face compliance of the crust-over-core-over-gRPC-links stack."""
import collections
import unittest
import six
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
from tests.unit import test_common as grpc_test_common
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.face import test_cases
from tests.unit.framework.interfaces.face import test_interfaces
class _SerializationBehaviors(
collections.namedtuple(
'_SerializationBehaviors',
('request_serializers', 'request_deserializers', 'response_serializers',
'response_deserializers',))):
pass
def _serialization_behaviors_from_test_methods(test_methods):
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for (group, method), test_method in six.iteritems(test_methods):
request_serializers[group, method] = test_method.serialize_request
request_deserializers[group, method] = test_method.deserialize_request
response_serializers[group, method] = test_method.serialize_response
response_deserializers[group, method] = test_method.deserialize_response
return _SerializationBehaviors(
request_serializers, request_deserializers, response_serializers,
response_deserializers)
class _Implementation(test_interfaces.Implementation):
def instantiate(
self, methods, method_implementations, multi_method_implementation):
pool = logging_pool.pool(test_constants.POOL_SIZE)
servicer = crust_implementations.servicer(
method_implementations, multi_method_implementation, pool)
serialization_behaviors = _serialization_behaviors_from_test_methods(
methods)
invocation_end_link = core_implementations.invocation_end_link()
service_end_link = core_implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost', None,
serialization_behaviors.request_serializers,
serialization_behaviors.response_deserializers)
invocation_end_link.join_link(invocation_grpc_link)
invocation_grpc_link.join_link(invocation_end_link)
service_grpc_link.join_link(service_end_link)
service_end_link.join_link(service_grpc_link)
service_end_link.start()
invocation_end_link.start()
invocation_grpc_link.start()
service_grpc_link.start()
generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
group = next(iter(methods))[0]
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to
# _digest.TestServiceDigest.
cardinalities = {
method: method_object.cardinality()
for (group, method), method_object in six.iteritems(methods)}
dynamic_stub = crust_implementations.dynamic_stub(
invocation_end_link, group, cardinalities, pool)
return generic_stub, {group: dynamic_stub}, (
invocation_end_link, invocation_grpc_link, service_grpc_link,
service_end_link, pool)
def destantiate(self, memo):
(invocation_end_link, invocation_grpc_link, service_grpc_link,
service_end_link, pool) = memo
invocation_end_link.stop(0).wait()
invocation_grpc_link.stop()
service_grpc_link.begin_stop()
service_end_link.stop(0).wait()
service_grpc_link.end_stop()
invocation_end_link.join_link(utilities.NULL_LINK)
invocation_grpc_link.join_link(utilities.NULL_LINK)
service_grpc_link.join_link(utilities.NULL_LINK)
service_end_link.join_link(utilities.NULL_LINK)
pool.shutdown(wait=True)
def invocation_metadata(self):
return grpc_test_common.INVOCATION_INITIAL_METADATA
def initial_metadata(self):
return grpc_test_common.SERVICE_INITIAL_METADATA
def terminal_metadata(self):
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
return beta_interfaces.StatusCode.OK
def details(self):
return grpc_test_common.DETAILS
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
original_metadata, transmitted_metadata)
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,88 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test of invocation-side code unconnected to an RPC server."""
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc.framework.interfaces.links import links
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.links import test_cases
from tests.unit.framework.interfaces.links import test_utilities
_NULL_BEHAVIOR = lambda unused_argument: None
class LonelyInvocationLinkTest(unittest.TestCase):
def testUpAndDown(self):
channel = _intermediary_low.Channel('nonexistent:54321', None)
invocation_link = invocation.invocation_link(
channel, 'nonexistent', None, {}, {})
invocation_link.start()
invocation_link.stop()
def _test_lonely_invocation_with_termination(self, termination):
test_operation_id = object()
test_group = 'test package.Test Service'
test_method = 'test method'
invocation_link_mate = test_utilities.RecordingLink()
channel = _intermediary_low.Channel('nonexistent:54321', None)
invocation_link = invocation.invocation_link(
channel, 'nonexistent', None, {}, {})
invocation_link.join_link(invocation_link_mate)
invocation_link.start()
ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None,
None, None, None, None, termination, None)
invocation_link.accept_ticket(ticket)
invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
self.assertIsNot(
invocation_link_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
def testLonelyInvocationLinkWithCommencementTicket(self):
self._test_lonely_invocation_with_termination(None)
def testLonelyInvocationLinkWithEntireTicket(self):
self._test_lonely_invocation_with_termination(
links.Ticket.Termination.COMPLETION)
if __name__ == '__main__':
unittest.main()

@ -1,239 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests transmission of tickets across gRPC-on-the-wire."""
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.interfaces.links import links
from tests.unit import test_common
from tests.unit._links import _proto_scenarios
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.links import test_cases
from tests.unit.framework.interfaces.links import test_utilities
_IDENTITY = lambda x: x
class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
def create_transmitting_links(self):
service_link = service.service_link(
{self.group_and_method(): self.deserialize_request},
{self.group_and_method(): self.serialize_response})
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost', None,
{self.group_and_method(): self.serialize_request},
{self.group_and_method(): self.deserialize_response})
invocation_link.start()
return invocation_link, service_link
def destroy_transmitting_links(self, invocation_side_link, service_side_link):
invocation_side_link.stop()
service_side_link.begin_stop()
service_side_link.end_stop()
def create_invocation_initial_metadata(self):
return (
('first_invocation_initial_metadata_key', 'just a string value'),
('second_invocation_initial_metadata_key', '0123456789'),
('third_invocation_initial_metadata_key-bin', '\x00\x57' * 100),
)
def create_invocation_terminal_metadata(self):
return None
def create_service_initial_metadata(self):
return (
('first_service_initial_metadata_key', 'just another string value'),
('second_service_initial_metadata_key', '9876543210'),
('third_service_initial_metadata_key-bin', '\x00\x59\x02' * 100),
)
def create_service_terminal_metadata(self):
return (
('first_service_terminal_metadata_key', 'yet another string value'),
('second_service_terminal_metadata_key', 'abcdefghij'),
('third_service_terminal_metadata_key-bin', '\x00\x37' * 100),
)
def create_invocation_completion(self):
return None, None
def create_service_completion(self):
return (
beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!')
def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
self.assertTrue(
test_common.metadata_transmitted(
original_metadata, transmitted_metadata),
'%s erroneously transmitted as %s' % (
original_metadata, transmitted_metadata))
class RoundTripTest(unittest.TestCase):
def testZeroMessageRoundTrip(self):
test_operation_id = object()
test_group = 'test package.Test Group'
test_method = 'test method'
identity_transformation = {(test_group, test_method): _IDENTITY}
test_code = beta_interfaces.StatusCode.OK
test_message = 'a test message'
service_link = service.service_link(
identity_transformation, identity_transformation)
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, None, None, identity_transformation, identity_transformation)
invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate)
invocation_link.start()
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, links.Ticket.Termination.COMPLETION, None)
invocation_link.accept_ticket(invocation_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_ticket = links.Ticket(
service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.begin_stop()
service_link.end_stop()
self.assertIs(
service_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
self.assertIs(
invocation_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
self.assertIs(invocation_mate.tickets()[-1].code, test_code)
self.assertEqual(invocation_mate.tickets()[-1].message, test_message.encode())
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_group, test_method = scenario.group_and_method()
test_code = beta_interfaces.StatusCode.OK
test_message = 'a scenario test message'
service_link = service.service_link(
{(test_group, test_method): scenario.deserialize_request},
{(test_group, test_method): scenario.serialize_response})
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost', None,
{(test_group, test_method): scenario.serialize_request},
{(test_group, test_method): scenario.deserialize_response})
invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate)
invocation_link.start()
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, None, None)
invocation_link.accept_ticket(invocation_ticket)
requests = scenario.requests()
for request_index, request in enumerate(requests):
request_ticket = links.Ticket(
test_operation_id, 1 + request_index, None, None, None, None, 1, None,
request, None, None, None, None, None)
invocation_link.accept_ticket(request_ticket)
service_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
response_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_index, None, None,
None, None, 1, None, scenario.response_for_request(request), None,
None, None, None, None)
service_link.accept_ticket(response_ticket)
invocation_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
request_count = len(requests)
invocation_completion_ticket = links.Ticket(
test_operation_id, request_count + 1, None, None, None, None, None,
None, None, None, None, None, links.Ticket.Termination.COMPLETION,
None)
invocation_link.accept_ticket(invocation_completion_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_completion_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_count, None, None, None,
None, None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_completion_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.begin_stop()
service_link.end_stop()
observed_requests = tuple(
ticket.payload for ticket in service_mate.tickets()
if ticket.payload is not None)
observed_responses = tuple(
ticket.payload for ticket in invocation_mate.tickets()
if ticket.payload is not None)
self.assertTrue(scenario.verify_requests(observed_requests))
self.assertTrue(scenario.verify_responses(observed_responses))
def testEmptyScenario(self):
self._perform_scenario_test(_proto_scenarios.EmptyScenario())
def testBidirectionallyUnaryScenario(self):
self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
def testBidirectionallyStreamingScenario(self):
self._perform_scenario_test(
_proto_scenarios.BidirectionallyStreamingScenario())
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,113 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests Face interface compliance of the crust-over-core stack."""
import collections
import unittest
import six
from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.face import test_cases
from tests.unit.framework.interfaces.face import test_interfaces
from tests.unit.framework.interfaces.links import test_utilities
class _Implementation(test_interfaces.Implementation):
def instantiate(
self, methods, method_implementations, multi_method_implementation):
pool = logging_pool.pool(test_constants.POOL_SIZE)
servicer = crust_implementations.servicer(
method_implementations, multi_method_implementation, pool)
service_end_link = core_implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
invocation_end_link = core_implementations.invocation_end_link()
invocation_end_link.join_link(service_end_link)
service_end_link.join_link(invocation_end_link)
service_end_link.start()
invocation_end_link.start()
generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
group = next(iter(methods))[0]
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to
# _digest.TestServiceDigest.
cardinalities = {
method: method_object.cardinality()
for (group, method), method_object in six.iteritems(methods)}
dynamic_stub = crust_implementations.dynamic_stub(
invocation_end_link, group, cardinalities, pool)
return generic_stub, {group: dynamic_stub}, (
invocation_end_link, service_end_link, pool)
def destantiate(self, memo):
invocation_end_link, service_end_link, pool = memo
invocation_end_link.stop(0).wait()
service_end_link.stop(0).wait()
invocation_end_link.join_link(utilities.NULL_LINK)
service_end_link.join_link(utilities.NULL_LINK)
pool.shutdown(wait=True)
def invocation_metadata(self):
return object()
def initial_metadata(self):
return object()
def terminal_metadata(self):
return object()
def code(self):
return object()
def details(self):
return object()
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is transmitted_metadata
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,96 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests the RPC Framework Core's implementation of the Base interface."""
import logging
import random
import time
import unittest
from grpc.framework.core import implementations
from grpc.framework.interfaces.base import utilities
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.base import test_cases
from tests.unit.framework.interfaces.base import test_interfaces
class _Implementation(test_interfaces.Implementation):
def __init__(self):
self._invocation_initial_metadata = object()
self._service_initial_metadata = object()
self._invocation_terminal_metadata = object()
self._service_terminal_metadata = object()
def instantiate(self, serializations, servicer):
invocation = implementations.invocation_end_link()
service = implementations.service_end_link(
servicer, test_constants.DEFAULT_TIMEOUT,
test_constants.MAXIMUM_TIMEOUT)
invocation.join_link(service)
service.join_link(invocation)
return invocation, service, None
def destantiate(self, memo):
pass
def invocation_initial_metadata(self):
return self._invocation_initial_metadata
def service_initial_metadata(self):
return self._service_initial_metadata
def invocation_completion(self):
return utilities.completion(self._invocation_terminal_metadata, None, None)
def service_completion(self):
return utilities.completion(self._service_terminal_metadata, None, None)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return transmitted_metadata is original_metadata
def completion_transmitted(self, original_completion, transmitted_completion):
return (
(original_completion.terminal_metadata is
transmitted_completion.terminal_metadata) and
original_completion.code is transmitted_completion.code and
original_completion.message is transmitted_completion.message
)
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,151 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the later module."""
import threading
import time
import unittest
from grpc.framework.foundation import later
TICK = 0.1
class LaterTest(unittest.TestCase):
def test_simple_delay(self):
lock = threading.Lock()
cell = [0]
return_value = object()
def computation():
with lock:
cell[0] += 1
return return_value
computation_future = later.later(TICK * 2, computation)
self.assertFalse(computation_future.done())
self.assertFalse(computation_future.cancelled())
time.sleep(TICK)
self.assertFalse(computation_future.done())
self.assertFalse(computation_future.cancelled())
with lock:
self.assertEqual(0, cell[0])
time.sleep(TICK * 2)
self.assertTrue(computation_future.done())
self.assertFalse(computation_future.cancelled())
with lock:
self.assertEqual(1, cell[0])
self.assertEqual(return_value, computation_future.result())
def test_callback(self):
lock = threading.Lock()
cell = [0]
callback_called = [False]
future_passed_to_callback = [None]
def computation():
with lock:
cell[0] += 1
computation_future = later.later(TICK * 2, computation)
def callback(outcome):
with lock:
callback_called[0] = True
future_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback)
time.sleep(TICK)
with lock:
self.assertFalse(callback_called[0])
time.sleep(TICK * 2)
with lock:
self.assertTrue(callback_called[0])
self.assertTrue(future_passed_to_callback[0].done())
callback_called[0] = False
future_passed_to_callback[0] = None
computation_future.add_done_callback(callback)
with lock:
self.assertTrue(callback_called[0])
self.assertTrue(future_passed_to_callback[0].done())
def test_cancel(self):
lock = threading.Lock()
cell = [0]
callback_called = [False]
future_passed_to_callback = [None]
def computation():
with lock:
cell[0] += 1
computation_future = later.later(TICK * 2, computation)
def callback(outcome):
with lock:
callback_called[0] = True
future_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback)
time.sleep(TICK)
with lock:
self.assertFalse(callback_called[0])
computation_future.cancel()
self.assertTrue(computation_future.cancelled())
self.assertFalse(computation_future.running())
self.assertTrue(computation_future.done())
with lock:
self.assertTrue(callback_called[0])
self.assertTrue(future_passed_to_callback[0].cancelled())
def test_result(self):
lock = threading.Lock()
cell = [0]
callback_called = [False]
future_passed_to_callback_cell = [None]
return_value = object()
def computation():
with lock:
cell[0] += 1
return return_value
computation_future = later.later(TICK * 2, computation)
def callback(future_passed_to_callback):
with lock:
callback_called[0] = True
future_passed_to_callback_cell[0] = future_passed_to_callback
computation_future.add_done_callback(callback)
returned_value = computation_future.result()
self.assertEqual(return_value, returned_value)
# The callback may not yet have been called! Sleep a tick.
time.sleep(TICK)
with lock:
self.assertTrue(callback_called[0])
self.assertEqual(return_value, future_passed_to_callback_cell[0].result())
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save