Merge pull request #12484 from nathanielmanistaatgoogle/more_cython_tests

Add more Cython-layer tests.
pull/12511/head
Nathaniel Manista 7 years ago committed by GitHub
commit fb056f0fe6
  1. 2
      src/python/grpcio_tests/tests/tests.json
  2. 118
      src/python/grpcio_tests/tests/unit/_cython/_common.py
  3. 131
      src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
  4. 126
      src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py

@ -26,6 +26,8 @@
"unit._credentials_test.CredentialsTest",
"unit._cython._cancel_many_calls_test.CancelManyCallsTest",
"unit._cython._channel_test.ChannelTest",
"unit._cython._no_messages_server_completion_queue_per_call_test.Test",
"unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
"unit._cython.cygrpc_test.InsecureServerInsecureClient",
"unit._cython.cygrpc_test.SecureServerSecureClient",

@ -0,0 +1,118 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common utilities for tests of the Cython layer of gRPC Python."""
import collections
import threading
from grpc._cython import cygrpc
RPC_COUNT = 4000
INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
EMPTY_FLAGS = 0
INVOCATION_METADATA = cygrpc.Metadata(
(cygrpc.Metadatum(b'client-md-key', b'client-md-key'),
cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),))
INITIAL_METADATA = cygrpc.Metadata(
(cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'),
cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),))
TRAILING_METADATA = cygrpc.Metadata(
(cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'),
cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),))
class QueueDriver(object):
def __init__(self, condition, completion_queue):
self._condition = condition
self._completion_queue = completion_queue
self._due = collections.defaultdict(int)
self._events = collections.defaultdict(list)
def add_due(self, tags):
if not self._due:
def in_thread():
while True:
event = self._completion_queue.poll()
with self._condition:
self._events[event.tag].append(event)
self._due[event.tag] -= 1
self._condition.notify_all()
if self._due[event.tag] <= 0:
self._due.pop(event.tag)
if not self._due:
return
thread = threading.Thread(target=in_thread)
thread.start()
for tag in tags:
self._due[tag] += 1
def event_with_tag(self, tag):
with self._condition:
while True:
if self._events[tag]:
return self._events[tag].pop(0)
else:
self._condition.wait()
def execute_many_times(behavior):
return tuple(behavior() for _ in range(RPC_COUNT))
class OperationResult(
collections.namedtuple('OperationResult', (
'start_batch_result', 'completion_type', 'success',))):
pass
SUCCESSFUL_OPERATION_RESULT = OperationResult(
cygrpc.CallError.ok, cygrpc.CompletionType.operation_complete, True)
class RpcTest(object):
def setUp(self):
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server(cygrpc.ChannelArgs([]))
self.server.register_completion_queue(self.server_completion_queue)
port = self.server.add_http2_port(b'[::]:0')
self.server.start()
self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(),
cygrpc.ChannelArgs([]))
self._server_shutdown_tag = 'server_shutdown_tag'
self.server_condition = threading.Condition()
self.server_driver = QueueDriver(self.server_condition,
self.server_completion_queue)
with self.server_condition:
self.server_driver.add_due({
self._server_shutdown_tag,
})
self.client_condition = threading.Condition()
self.client_completion_queue = cygrpc.CompletionQueue()
self.client_driver = QueueDriver(self.client_condition,
self.client_completion_queue)
def tearDown(self):
self.server.shutdown(self.server_completion_queue,
self._server_shutdown_tag)
self.server.cancel_all_calls()

@ -0,0 +1,131 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test a corner-case at the level of the Cython API."""
import threading
import unittest
from grpc._cython import cygrpc
from tests.unit._cython import _common
class Test(_common.RpcTest, unittest.TestCase):
def _do_rpcs(self):
server_call_condition = threading.Condition()
server_call_completion_queue = cygrpc.CompletionQueue()
server_call_driver = _common.QueueDriver(server_call_condition,
server_call_completion_queue)
server_request_call_tag = 'server_request_call_tag'
server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
server_complete_rpc_tag = 'server_complete_rpc_tag'
with self.server_condition:
server_request_call_start_batch_result = self.server.request_call(
server_call_completion_queue, self.server_completion_queue,
server_request_call_tag)
self.server_driver.add_due({
server_request_call_tag,
})
client_call = self.channel.create_call(
None, _common.EMPTY_FLAGS, self.client_completion_queue,
b'/twinkies', None, _common.INFINITE_FUTURE)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch(
cygrpc.Operations([
cygrpc.operation_receive_initial_metadata(
_common.EMPTY_FLAGS),
]), client_receive_initial_metadata_tag))
client_complete_rpc_start_batch_result = client_call.start_client_batch(
cygrpc.Operations([
cygrpc.operation_send_initial_metadata(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(
_common.EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(
_common.EMPTY_FLAGS),
]), client_complete_rpc_tag)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
client_complete_rpc_tag,
})
server_request_call_event = self.server_driver.event_with_tag(
server_request_call_tag)
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_send_initial_metadata(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_call_driver.add_due({
server_send_initial_metadata_tag,
})
server_send_initial_metadata_event = server_call_driver.event_with_tag(
server_send_initial_metadata_tag)
with server_call_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_receive_close_on_server(
_common.EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
b'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
server_call_driver.add_due({
server_complete_rpc_tag,
})
server_complete_rpc_event = server_call_driver.event_with_tag(
server_complete_rpc_tag)
client_receive_initial_metadata_event = self.client_driver.event_with_tag(
client_receive_initial_metadata_tag)
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
return (_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.type,
server_request_call_event.success),
_common.OperationResult(
client_receive_initial_metadata_start_batch_result,
client_receive_initial_metadata_event.type,
client_receive_initial_metadata_event.success),
_common.OperationResult(client_complete_rpc_start_batch_result,
client_complete_rpc_event.type,
client_complete_rpc_event.success),
_common.OperationResult(
server_send_initial_metadata_start_batch_result,
server_send_initial_metadata_event.type,
server_send_initial_metadata_event.success),
_common.OperationResult(server_complete_rpc_start_batch_result,
server_complete_rpc_event.type,
server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
5] * _common.RPC_COUNT
actuallys = _common.execute_many_times(self._do_rpcs)
self.assertSequenceEqual(expecteds, actuallys)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,126 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test a corner-case at the level of the Cython API."""
import threading
import unittest
from grpc._cython import cygrpc
from tests.unit._cython import _common
class Test(_common.RpcTest, unittest.TestCase):
def _do_rpcs(self):
server_request_call_tag = 'server_request_call_tag'
server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
server_complete_rpc_tag = 'server_complete_rpc_tag'
with self.server_condition:
server_request_call_start_batch_result = self.server.request_call(
self.server_completion_queue, self.server_completion_queue,
server_request_call_tag)
self.server_driver.add_due({
server_request_call_tag,
})
client_call = self.channel.create_call(
None, _common.EMPTY_FLAGS, self.client_completion_queue,
b'/twinkies', None, _common.INFINITE_FUTURE)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch(
cygrpc.Operations([
cygrpc.operation_receive_initial_metadata(
_common.EMPTY_FLAGS),
]), client_receive_initial_metadata_tag))
client_complete_rpc_start_batch_result = client_call.start_client_batch(
cygrpc.Operations([
cygrpc.operation_send_initial_metadata(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(
_common.EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(
_common.EMPTY_FLAGS),
]), client_complete_rpc_tag)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
client_complete_rpc_tag,
})
server_request_call_event = self.server_driver.event_with_tag(
server_request_call_tag)
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_send_initial_metadata(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
self.server_driver.add_due({
server_send_initial_metadata_tag,
})
server_send_initial_metadata_event = self.server_driver.event_with_tag(
server_send_initial_metadata_tag)
with self.server_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_receive_close_on_server(
_common.EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
b'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
self.server_driver.add_due({
server_complete_rpc_tag,
})
server_complete_rpc_event = self.server_driver.event_with_tag(
server_complete_rpc_tag)
client_receive_initial_metadata_event = self.client_driver.event_with_tag(
client_receive_initial_metadata_tag)
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
return (_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.type,
server_request_call_event.success),
_common.OperationResult(
client_receive_initial_metadata_start_batch_result,
client_receive_initial_metadata_event.type,
client_receive_initial_metadata_event.success),
_common.OperationResult(client_complete_rpc_start_batch_result,
client_complete_rpc_event.type,
client_complete_rpc_event.success),
_common.OperationResult(
server_send_initial_metadata_start_batch_result,
server_send_initial_metadata_event.type,
server_send_initial_metadata_event.success),
_common.OperationResult(server_complete_rpc_start_batch_result,
server_complete_rpc_event.type,
server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
5] * _common.RPC_COUNT
actuallys = _common.execute_many_times(self._do_rpcs)
self.assertSequenceEqual(expecteds, actuallys)
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save