Adapt _intermediary_low_test to no backup poller

Pull request #1577 removes the backup poller internal to completion
queues so this test needs to use extra threads to drive the work done
inside the completion queues by continuously calling each completion
queue's get method.
pull/2090/head
Nathaniel Manista 10 years ago
parent 4c32de585a
commit c6636c01a5
  1. 95
      src/python/src/grpc/_adapter/_intermediary_low_test.py

@ -29,6 +29,8 @@
"""Tests for the old '_low'.""" """Tests for the old '_low'."""
import Queue
import threading
import time import time
import unittest import unittest
@ -43,6 +45,7 @@ _BYTE_SEQUENCE_SEQUENCE = tuple(
bytes(bytearray((row + column) % 256 for column in range(row))) bytes(bytearray((row + column) % 256 for column in range(row)))
for row in range(_STREAM_LENGTH)) for row in range(_STREAM_LENGTH))
class LonelyClientTest(unittest.TestCase): class LonelyClientTest(unittest.TestCase):
def testLonelyClient(self): def testLonelyClient(self):
@ -79,6 +82,14 @@ class LonelyClientTest(unittest.TestCase):
del completion_queue del completion_queue
def _drive_completion_queue(completion_queue, event_queue):
while True:
event = completion_queue.get(_FUTURE)
if event.kind is _low.Event.Kind.STOP:
break
event_queue.put(event)
class EchoTest(unittest.TestCase): class EchoTest(unittest.TestCase):
def setUp(self): def setUp(self):
@ -88,24 +99,26 @@ class EchoTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue) self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0') port = self.server.add_http2_addr('[::]:0')
self.server.start() self.server.start()
self.server_events = Queue.Queue()
self.server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.server_completion_queue, self.server_events))
self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue() self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None) self.channel = _low.Channel('%s:%d' % (self.host, port), None)
self.client_events = Queue.Queue()
self.client_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.client_completion_queue, self.client_events))
self.client_completion_queue_thread.start()
def tearDown(self): def tearDown(self):
self.server.stop() self.server.stop()
self.server_completion_queue.stop() self.server_completion_queue.stop()
self.client_completion_queue.stop() self.client_completion_queue.stop()
while True: self.server_completion_queue_thread.join()
event = self.server_completion_queue.get(_FUTURE) self.client_completion_queue_thread.join()
if event is not None and event.kind is _low.Event.Kind.STOP:
break
while True:
event = self.client_completion_queue.get(_FUTURE)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
self.server_completion_queue = None
self.client_completion_queue = None
del self.server del self.server
def _perform_echo_test(self, test_data): def _perform_echo_test(self, test_data):
@ -144,7 +157,7 @@ class EchoTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag) self.server.service(service_tag)
service_accepted = self.server_completion_queue.get(_FUTURE) service_accepted = self.server_events.get()
self.assertIsNotNone(service_accepted) self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag) self.assertIs(service_accepted.tag, service_tag)
@ -165,7 +178,7 @@ class EchoTest(unittest.TestCase):
server_leading_binary_metadata_value) server_leading_binary_metadata_value)
server_call.premetadata() server_call.premetadata()
metadata_accepted = self.client_completion_queue.get(_FUTURE) metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted) self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag) self.assertEqual(metadata_tag, metadata_accepted.tag)
@ -179,14 +192,14 @@ class EchoTest(unittest.TestCase):
for datum in test_data: for datum in test_data:
client_call.write(datum, write_tag) client_call.write(datum, write_tag)
write_accepted = self.client_completion_queue.get(_FUTURE) write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted) self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
self.assertIs(write_accepted.tag, write_tag) self.assertIs(write_accepted.tag, write_tag)
self.assertIs(write_accepted.write_accepted, True) self.assertIs(write_accepted.write_accepted, True)
server_call.read(read_tag) server_call.read(read_tag)
read_accepted = self.server_completion_queue.get(_FUTURE) read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted) self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag) self.assertEqual(read_tag, read_accepted.tag)
@ -194,14 +207,14 @@ class EchoTest(unittest.TestCase):
server_data.append(read_accepted.bytes) server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag) server_call.write(read_accepted.bytes, write_tag)
write_accepted = self.server_completion_queue.get(_FUTURE) write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted) self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
self.assertEqual(write_tag, write_accepted.tag) self.assertEqual(write_tag, write_accepted.tag)
self.assertTrue(write_accepted.write_accepted) self.assertTrue(write_accepted.write_accepted)
client_call.read(read_tag) client_call.read(read_tag)
read_accepted = self.client_completion_queue.get(_FUTURE) read_accepted = self.client_events.get()
self.assertIsNotNone(read_accepted) self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag) self.assertEqual(read_tag, read_accepted.tag)
@ -209,14 +222,14 @@ class EchoTest(unittest.TestCase):
client_data.append(read_accepted.bytes) client_data.append(read_accepted.bytes)
client_call.complete(complete_tag) client_call.complete(complete_tag)
complete_accepted = self.client_completion_queue.get(_FUTURE) complete_accepted = self.client_events.get()
self.assertIsNotNone(complete_accepted) self.assertIsNotNone(complete_accepted)
self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
self.assertIs(complete_accepted.tag, complete_tag) self.assertIs(complete_accepted.tag, complete_tag)
self.assertIs(complete_accepted.complete_accepted, True) self.assertIs(complete_accepted.complete_accepted, True)
server_call.read(read_tag) server_call.read(read_tag)
read_accepted = self.server_completion_queue.get(_FUTURE) read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted) self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag) self.assertEqual(read_tag, read_accepted.tag)
@ -228,8 +241,8 @@ class EchoTest(unittest.TestCase):
server_trailing_binary_metadata_value) server_trailing_binary_metadata_value)
server_call.status(_low.Status(_low.Code.OK, details), status_tag) server_call.status(_low.Status(_low.Code.OK, details), status_tag)
server_terminal_event_one = self.server_completion_queue.get(_FUTURE) server_terminal_event_one = self.server_events.get()
server_terminal_event_two = self.server_completion_queue.get(_FUTURE) server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
status_accepted = server_terminal_event_one status_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two rpc_accepted = server_terminal_event_two
@ -246,8 +259,8 @@ class EchoTest(unittest.TestCase):
self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
client_call.read(read_tag) client_call.read(read_tag)
client_terminal_event_one = self.client_completion_queue.get(_FUTURE) client_terminal_event_one = self.client_events.get()
client_terminal_event_two = self.client_completion_queue.get(_FUTURE) client_terminal_event_two = self.client_events.get()
if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = client_terminal_event_one read_accepted = client_terminal_event_one
finish_accepted = client_terminal_event_two finish_accepted = client_terminal_event_two
@ -303,22 +316,26 @@ class CancellationTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue) self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0') port = self.server.add_http2_addr('[::]:0')
self.server.start() self.server.start()
self.server_events = Queue.Queue()
self.server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.server_completion_queue, self.server_events))
self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue() self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None) self.channel = _low.Channel('%s:%d' % (self.host, port), None)
self.client_events = Queue.Queue()
self.client_completion_queue_thread = threading.Thread(
target=_drive_completion_queue,
args=(self.client_completion_queue, self.client_events))
self.client_completion_queue_thread.start()
def tearDown(self): def tearDown(self):
self.server.stop() self.server.stop()
self.server_completion_queue.stop() self.server_completion_queue.stop()
self.client_completion_queue.stop() self.client_completion_queue.stop()
while True: self.server_completion_queue_thread.join()
event = self.server_completion_queue.get(0) self.client_completion_queue_thread.join()
if event is not None and event.kind is _low.Event.Kind.STOP:
break
while True:
event = self.client_completion_queue.get(0)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
del self.server del self.server
def testCancellation(self): def testCancellation(self):
@ -340,29 +357,29 @@ class CancellationTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag) self.server.service(service_tag)
service_accepted = self.server_completion_queue.get(_FUTURE) service_accepted = self.server_events.get()
server_call = service_accepted.service_acceptance.call server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag) server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata() server_call.premetadata()
metadata_accepted = self.client_completion_queue.get(_FUTURE) metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted) self.assertIsNotNone(metadata_accepted)
for datum in test_data: for datum in test_data:
client_call.write(datum, write_tag) client_call.write(datum, write_tag)
write_accepted = self.client_completion_queue.get(_FUTURE) write_accepted = self.client_events.get()
server_call.read(read_tag) server_call.read(read_tag)
read_accepted = self.server_completion_queue.get(_FUTURE) read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes) server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag) server_call.write(read_accepted.bytes, write_tag)
write_accepted = self.server_completion_queue.get(_FUTURE) write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted) self.assertIsNotNone(write_accepted)
client_call.read(read_tag) client_call.read(read_tag)
read_accepted = self.client_completion_queue.get(_FUTURE) read_accepted = self.client_events.get()
client_data.append(read_accepted.bytes) client_data.append(read_accepted.bytes)
client_call.cancel() client_call.cancel()
@ -373,8 +390,8 @@ class CancellationTest(unittest.TestCase):
server_call.read(read_tag) server_call.read(read_tag)
server_terminal_event_one = self.server_completion_queue.get(_FUTURE) server_terminal_event_one = self.server_events.get()
server_terminal_event_two = self.server_completion_queue.get(_FUTURE) server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = server_terminal_event_one read_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two rpc_accepted = server_terminal_event_two
@ -388,7 +405,7 @@ class CancellationTest(unittest.TestCase):
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
finish_event = self.client_completion_queue.get(_FUTURE) finish_event = self.client_events.get()
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
finish_event.status) finish_event.status)

Loading…
Cancel
Save