Merge pull request #5660 from leifurhauks/py3_test_fixes

fixes to tests for py2/3 syntax compatibility
pull/5869/merge
Jan Tattermusch 9 years ago
commit 609226a90f
  1. 2
      src/python/grpcio/tests/_result.py
  2. 9
      src/python/grpcio/tests/_runner.py
  3. 4
      src/python/grpcio/tests/interop/methods.py
  4. 54
      src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
  5. 16
      src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
  6. 12
      src/python/grpcio/tests/unit/framework/interfaces/base/_control.py
  7. 4
      src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
  8. 4
      src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py

@ -204,7 +204,7 @@ class AugmentedResult(unittest.TestResult):
""" """
case_id = self.id_map(test) case_id = self.id_map(test)
self.cases[case_id] = self.cases[case_id].updated( self.cases[case_id] = self.cases[case_id].updated(
stdout=stdout, stderr=stderr) stdout=stdout.decode(), stderr=stderr.decode())
def augmented_results(self, filter): def augmented_results(self, filter):
"""Convenience method to retrieve filtered case results. """Convenience method to retrieve filtered case results.

@ -42,6 +42,7 @@ import time
import unittest import unittest
import uuid import uuid
import six
from six import moves from six import moves
from tests import _loader from tests import _loader
@ -95,6 +96,8 @@ class CaptureFile(object):
Arguments: Arguments:
value (str): What to write to the original file. value (str): What to write to the original file.
""" """
if six.PY3 and not isinstance(value, six.binary_type):
value = bytes(value, 'ascii')
if self._saved_fd is None: if self._saved_fd is None:
os.write(self._redirect_fd, value) os.write(self._redirect_fd, value)
else: else:
@ -171,9 +174,9 @@ class Runner(object):
result.stopTestRun() result.stopTestRun()
stdout_pipe.write_bypass(result_out.getvalue()) stdout_pipe.write_bypass(result_out.getvalue())
stdout_pipe.write_bypass( stdout_pipe.write_bypass(
'\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output())) '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output().decode()))
stderr_pipe.write_bypass( stderr_pipe.write_bypass(
'\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output())) '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output().decode()))
os._exit(1) os._exit(1)
signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGSEGV, fault_handler) signal.signal(signal.SIGSEGV, fault_handler)
@ -216,7 +219,7 @@ class Runner(object):
sys.stdout.write(result_out.getvalue()) sys.stdout.write(result_out.getvalue())
sys.stdout.flush() sys.stdout.flush()
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
with open('report.xml', 'w') as report_xml_file: with open('report.xml', 'wb') as report_xml_file:
_result.jenkins_junit_xml(result).write(report_xml_file) _result.jenkins_junit_xml(result).write(report_xml_file)
return result return result

@ -29,6 +29,8 @@
"""Implementations of interoperability test methods.""" """Implementations of interoperability test methods."""
from __future__ import print_function
import enum import enum
import json import json
import os import os
@ -208,7 +210,7 @@ def _ping_pong(stub):
with stub, _Pipe() as pipe: with stub, _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
print 'Starting ping-pong with response iterator %s' % response_iterator print('Starting ping-pong with response iterator %s' % response_iterator)
for response_size, payload_size in zip( for response_size, payload_size in zip(
request_response_sizes, request_payload_sizes): request_response_sizes, request_payload_sizes):
request = messages_pb2.StreamingOutputCallRequest( request = messages_pb2.StreamingOutputCallRequest(

@ -1,4 +1,4 @@
# Copyright 2015, Google Inc. # Copyright 2015-2016, Google Inc.
# All rights reserved. # All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
@ -42,6 +42,8 @@ import threading
import time import time
import unittest import unittest
from six import moves
from grpc.beta import implementations from grpc.beta import implementations
from grpc.framework.foundation import future from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face from grpc.framework.interfaces.face import face
@ -250,7 +252,7 @@ class PythonPluginTest(unittest.TestCase):
def testImportAttributes(self): def testImportAttributes(self):
# check that we can access the generated module and its members. # check that we can access the generated module and its members.
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None)) self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None)) self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None)) self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
@ -258,13 +260,13 @@ class PythonPluginTest(unittest.TestCase):
def testUpDown(self): def testUpDown(self):
import protoc_plugin_test_pb2 as test_pb2 import protoc_plugin_test_pb2 as test_pb2
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (servicer, stub): with _CreateService(test_pb2) as (servicer, stub):
request = test_pb2.SimpleRequest(response_size=13) request = test_pb2.SimpleRequest(response_size=13)
def testUnaryCall(self): def testUnaryCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
request = test_pb2.SimpleRequest(response_size=13) request = test_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT) response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
@ -273,7 +275,7 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallFuture(self): def testUnaryCallFuture(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = test_pb2.SimpleRequest(response_size=13) request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
# Check that the call does not block waiting for the server to respond. # Check that the call does not block waiting for the server to respond.
@ -286,7 +288,7 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallFutureExpired(self): def testUnaryCallFutureExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
request = test_pb2.SimpleRequest(response_size=13) request = test_pb2.SimpleRequest(response_size=13)
with methods.pause(): with methods.pause():
@ -297,7 +299,7 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallFutureCancelled(self): def testUnaryCallFutureCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = test_pb2.SimpleRequest(response_size=13) request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.pause(): with methods.pause():
@ -307,7 +309,7 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallFutureFailed(self): def testUnaryCallFutureFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = test_pb2.SimpleRequest(response_size=13) request = test_pb2.SimpleRequest(response_size=13)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.fail(): with methods.fail():
@ -317,20 +319,20 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingOutputCall(self): def testStreamingOutputCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2) request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
responses = stub.StreamingOutputCall( responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT) request, test_constants.LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall( expected_responses = methods.StreamingOutputCall(
request, 'not a real RpcContext!') request, 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest( for expected_response, response in moves.zip_longest(
expected_responses, responses): expected_responses, responses):
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
def testStreamingOutputCallExpired(self): def testStreamingOutputCallExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2) request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.pause(): with methods.pause():
@ -341,7 +343,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingOutputCallCancelled(self): def testStreamingOutputCallCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2) request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (unused_methods, stub): with _CreateService(test_pb2) as (unused_methods, stub):
responses = stub.StreamingOutputCall( responses = stub.StreamingOutputCall(
@ -353,7 +355,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingOutputCallFailed(self): def testStreamingOutputCallFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request = _streaming_output_request(test_pb2) request = _streaming_output_request(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.fail(): with methods.fail():
@ -364,7 +366,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCall(self): def testStreamingInputCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
response = stub.StreamingInputCall( response = stub.StreamingInputCall(
_streaming_input_request_iterator(test_pb2), _streaming_input_request_iterator(test_pb2),
@ -375,7 +377,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallFuture(self): def testStreamingInputCallFuture(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.pause(): with methods.pause():
response_future = stub.StreamingInputCall.future( response_future = stub.StreamingInputCall.future(
@ -388,7 +390,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallFutureExpired(self): def testStreamingInputCallFutureExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.pause(): with methods.pause():
response_future = stub.StreamingInputCall.future( response_future = stub.StreamingInputCall.future(
@ -401,7 +403,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallFutureCancelled(self): def testStreamingInputCallFutureCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.pause(): with methods.pause():
response_future = stub.StreamingInputCall.future( response_future = stub.StreamingInputCall.future(
@ -414,7 +416,7 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallFutureFailed(self): def testStreamingInputCallFutureFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.fail(): with methods.fail():
response_future = stub.StreamingInputCall.future( response_future = stub.StreamingInputCall.future(
@ -424,19 +426,19 @@ class PythonPluginTest(unittest.TestCase):
def testFullDuplexCall(self): def testFullDuplexCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
responses = stub.FullDuplexCall( responses = stub.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT) _full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall( expected_responses = methods.FullDuplexCall(
_full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
for expected_response, response in itertools.izip_longest( for expected_response, response in moves.zip_longest(
expected_responses, responses): expected_responses, responses):
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
def testFullDuplexCallExpired(self): def testFullDuplexCallExpired(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request_iterator = _full_duplex_request_iterator(test_pb2) request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.pause(): with methods.pause():
@ -447,7 +449,7 @@ class PythonPluginTest(unittest.TestCase):
def testFullDuplexCallCancelled(self): def testFullDuplexCallCancelled(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
request_iterator = _full_duplex_request_iterator(test_pb2) request_iterator = _full_duplex_request_iterator(test_pb2)
responses = stub.FullDuplexCall( responses = stub.FullDuplexCall(
@ -459,7 +461,7 @@ class PythonPluginTest(unittest.TestCase):
def testFullDuplexCallFailed(self): def testFullDuplexCallFailed(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
request_iterator = _full_duplex_request_iterator(test_pb2) request_iterator = _full_duplex_request_iterator(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
with methods.fail(): with methods.fail():
@ -471,7 +473,7 @@ class PythonPluginTest(unittest.TestCase):
def testHalfDuplexCall(self): def testHalfDuplexCall(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
with _CreateService(test_pb2) as (methods, stub): with _CreateService(test_pb2) as (methods, stub):
def half_duplex_request_iterator(): def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest() request = test_pb2.StreamingOutputCallRequest()
@ -485,13 +487,13 @@ class PythonPluginTest(unittest.TestCase):
half_duplex_request_iterator(), test_constants.LONG_TIMEOUT) half_duplex_request_iterator(), test_constants.LONG_TIMEOUT)
expected_responses = methods.HalfDuplexCall( expected_responses = methods.HalfDuplexCall(
half_duplex_request_iterator(), 'not a real RpcContext!') half_duplex_request_iterator(), 'not a real RpcContext!')
for check in itertools.izip_longest(expected_responses, responses): for check in moves.zip_longest(expected_responses, responses):
expected_response, response = check expected_response, response = check
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self): def testHalfDuplexCallWedged(self):
import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
reload(test_pb2) moves.reload_module(test_pb2)
condition = threading.Condition() condition = threading.Condition()
wait_cell = [False] wait_cell = [False]
@contextlib.contextmanager @contextlib.contextmanager

@ -1,4 +1,4 @@
# Copyright 2015, Google Inc. # Copyright 2015-2016, Google Inc.
# All rights reserved. # All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
@ -29,11 +29,13 @@
"""Tests for the old '_low'.""" """Tests for the old '_low'."""
import Queue
import threading import threading
import time import time
import unittest import unittest
import six
from six.moves import queue
from grpc._adapter import _intermediary_low as _low from grpc._adapter import _intermediary_low as _low
_STREAM_LENGTH = 300 _STREAM_LENGTH = 300
@ -67,7 +69,7 @@ class LonelyClientTest(unittest.TestCase):
second_event = completion_queue.get(after_deadline) second_event = completion_queue.get(after_deadline)
self.assertIsNotNone(second_event) self.assertIsNotNone(second_event)
kinds = [event.kind for event in (first_event, second_event)] kinds = [event.kind for event in (first_event, second_event)]
self.assertItemsEqual( six.assertCountEqual(self,
(_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
kinds) kinds)
@ -99,7 +101,7 @@ class EchoTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue) self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0') port = self.server.add_http2_addr('[::]:0')
self.server.start() self.server.start()
self.server_events = Queue.Queue() self.server_events = queue.Queue()
self.server_completion_queue_thread = threading.Thread( self.server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, target=_drive_completion_queue,
args=(self.server_completion_queue, self.server_events)) args=(self.server_completion_queue, self.server_events))
@ -107,7 +109,7 @@ class EchoTest(unittest.TestCase):
self.client_completion_queue = _low.CompletionQueue() self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None) self.channel = _low.Channel('%s:%d' % (self.host, port), None)
self.client_events = Queue.Queue() self.client_events = queue.Queue()
self.client_completion_queue_thread = threading.Thread( self.client_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, target=_drive_completion_queue,
args=(self.client_completion_queue, self.client_events)) args=(self.client_completion_queue, self.client_events))
@ -315,7 +317,7 @@ class CancellationTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue) self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0') port = self.server.add_http2_addr('[::]:0')
self.server.start() self.server.start()
self.server_events = Queue.Queue() self.server_events = queue.Queue()
self.server_completion_queue_thread = threading.Thread( self.server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, target=_drive_completion_queue,
args=(self.server_completion_queue, self.server_events)) args=(self.server_completion_queue, self.server_events))
@ -323,7 +325,7 @@ class CancellationTest(unittest.TestCase):
self.client_completion_queue = _low.CompletionQueue() self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None) self.channel = _low.Channel('%s:%d' % (self.host, port), None)
self.client_events = Queue.Queue() self.client_events = queue.Queue()
self.client_completion_queue_thread = threading.Thread( self.client_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, target=_drive_completion_queue,
args=(self.client_completion_queue, self.client_events)) args=(self.client_completion_queue, self.client_events))

@ -29,6 +29,8 @@
"""Part of the tests of the base interface of RPC Framework.""" """Part of the tests of the base interface of RPC Framework."""
from __future__ import division
import abc import abc
import collections import collections
import enum import enum
@ -47,8 +49,8 @@ from tests.unit.framework.interfaces.base import test_interfaces # pylint: disa
_GROUP = 'base test cases test group' _GROUP = 'base test cases test group'
_METHOD = 'base test cases test method' _METHOD = 'base test cases test method'
_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20 _PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE // 20
_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600 _MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE // 600
def _create_payload(randomness): def _create_payload(randomness):
@ -59,7 +61,7 @@ def _create_payload(randomness):
random_section = bytes( random_section = bytes(
bytearray( bytearray(
randomness.getrandbits(8) for _ in range(random_section_length))) randomness.getrandbits(8) for _ in range(random_section_length)))
sevens_section = '\x07' * (length - random_section_length) sevens_section = b'\x07' * (length - random_section_length)
return b''.join(randomness.sample((random_section, sevens_section), 2)) return b''.join(randomness.sample((random_section, sevens_section), 2))
@ -385,13 +387,13 @@ class _SequenceController(Controller):
return request + request return request + request
def deserialize_request(self, serialized_request): def deserialize_request(self, serialized_request):
return serialized_request[:len(serialized_request) / 2] return serialized_request[:len(serialized_request) // 2]
def serialize_response(self, response): def serialize_response(self, response):
return response * 3 return response * 3
def deserialize_response(self, serialized_response): def deserialize_response(self, serialized_response):
return serialized_response[2 * len(serialized_response) / 3:] return serialized_response[2 * len(serialized_response) // 3:]
def invocation(self): def invocation(self):
with self._condition: with self._condition:

@ -29,6 +29,8 @@
"""Test code for the Face layer of RPC Framework.""" """Test code for the Face layer of RPC Framework."""
from __future__ import division
import abc import abc
import itertools import itertools
import unittest import unittest
@ -182,7 +184,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice( some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices), futures.as_completed(response_futures_to_indices),
test_constants.PARALLELISM / 2) test_constants.PARALLELISM // 2)
for response_future in some_completed_response_futures_iterator: for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future] index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self) test_messages.verify(requests[index], response_future.result(), self)

@ -29,6 +29,8 @@
"""Test code for the Face layer of RPC Framework.""" """Test code for the Face layer of RPC Framework."""
from __future__ import division
import abc import abc
import contextlib import contextlib
import itertools import itertools
@ -277,7 +279,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice( some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices), futures.as_completed(response_futures_to_indices),
test_constants.PARALLELISM / 2) test_constants.PARALLELISM // 2)
for response_future in some_completed_response_futures_iterator: for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future] index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self) test_messages.verify(requests[index], response_future.result(), self)

Loading…
Cancel
Save