Merge pull request #6602 from nathanielmanistaatgoogle/test-parallelism

Break PARALLELISM test constant in two
pull/6617/head
Jan Tattermusch 9 years ago
commit 9a0da591ce
  1. 2
      src/python/grpcio/tests/unit/_cython/_channel_test.py
  2. 9
      src/python/grpcio/tests/unit/framework/common/test_constants.py
  3. 10
      src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
  4. 8
      src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py

@ -60,7 +60,7 @@ def _create_loop_destroy():
def _in_parallel(behavior, arguments):
threads = tuple(
threading.Thread(target=behavior, args=arguments)
for _ in range(test_constants.PARALLELISM))
for _ in range(test_constants.THREAD_CONCURRENCY))
for thread in threads:
thread.start()
for thread in threads:

@ -49,8 +49,13 @@ STREAM_LENGTH = 200
# The size of payloads to transmit in tests.
PAYLOAD_SIZE = 256 * 1024 + 17
# The parallelism to use in tests of parallel RPCs.
PARALLELISM = 200
# The concurrency to use in tests of concurrent RPCs that will not create as
# many threads as RPCs.
RPC_CONCURRENCY = 200
# The concurrency to use in tests of concurrent RPCs that will create as many
# threads as RPCs.
THREAD_CONCURRENCY = 25
# The size of thread pools to use in tests.
POOL_SIZE = 10

@ -146,13 +146,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(second_request, second_response, self)
def testParallelInvocations(self):
pool = logging_pool.pool(test_constants.PARALLELISM)
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures = []
for _ in range(test_constants.PARALLELISM):
for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@ -168,13 +168,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
pool.shutdown(wait=True)
def testWaitingForSomeButNotAllParallelInvocations(self):
pool = logging_pool.pool(test_constants.PARALLELISM)
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
for index in range(test_constants.PARALLELISM):
for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@ -184,7 +184,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
test_constants.PARALLELISM // 2)
test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)

@ -249,7 +249,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
for test_messages in test_messages_sequence:
requests = []
response_futures = []
for _ in range(test_constants.PARALLELISM):
for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@ -263,13 +263,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(request, response, self)
def testWaitingForSomeButNotAllParallelInvocations(self):
pool = logging_pool.pool(test_constants.PARALLELISM)
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
for index in range(test_constants.PARALLELISM):
for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
inner_response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@ -279,7 +279,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
test_constants.PARALLELISM // 2)
test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)

Loading…
Cancel
Save