Merge pull request #14702 from nathanielmanistaatgoogle/_face_interface_test

Remove _face_interface_test.
pull/14556/head
Nathaniel Manista 7 years ago committed by GitHub
commit b20f768f57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/python/grpcio_tests/tests/tests.json
  2. 132
      src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py
  3. 13
      src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
  4. 21
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
  5. 13
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
  6. 287
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
  7. 432
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
  8. 508
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
  9. 198
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
  10. 304
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
  11. 390
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
  12. 53
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
  13. 212
      src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py

@ -218,12 +218,6 @@
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest",
"unit.beta._face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"unit.beta._face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"unit.beta._face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"unit.beta._face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"unit.beta._face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"unit.beta._face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"unit.beta._implementations_test.CallCredentialsTest",
"unit.beta._implementations_test.ChannelCredentialsTest",
"unit.beta._not_found_test.NotFoundTest",

@ -1,132 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests Face interface compliance of the gRPC Python Beta API."""
import collections
import unittest
import six
from grpc.beta import implementations
from grpc.beta import interfaces
from tests.unit import resources
from tests.unit import test_common as grpc_test_common
from tests.unit.beta import test_utilities
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.face import test_cases
from tests.unit.framework.interfaces.face import test_interfaces
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
class _SerializationBehaviors(
collections.namedtuple('_SerializationBehaviors', (
'request_serializers',
'request_deserializers',
'response_serializers',
'response_deserializers',
))):
pass
def _serialization_behaviors_from_test_methods(test_methods):
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for (group, method), test_method in six.iteritems(test_methods):
request_serializers[group, method] = test_method.serialize_request
request_deserializers[group, method] = test_method.deserialize_request
response_serializers[group, method] = test_method.serialize_response
response_deserializers[group, method] = test_method.deserialize_response
return _SerializationBehaviors(request_serializers, request_deserializers,
response_serializers, response_deserializers)
class _Implementation(test_interfaces.Implementation):
def instantiate(self, methods, method_implementations,
multi_method_implementation):
serialization_behaviors = _serialization_behaviors_from_test_methods(
methods)
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
service = next(iter(methods))[0]
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to
# _digest.TestServiceDigest.
cardinalities = {
method: method_object.cardinality()
for (group, method), method_object in six.iteritems(methods)
}
server_options = implementations.server_options(
request_deserializers=serialization_behaviors.request_deserializers,
response_serializers=serialization_behaviors.response_serializers,
thread_pool_size=test_constants.POOL_SIZE)
server = implementations.server(
method_implementations, options=server_options)
server_credentials = implementations.ssl_server_credentials([
(
resources.private_key(),
resources.certificate_chain(),
),
])
port = server.add_secure_port('[::]:0', server_credentials)
server.start()
channel_credentials = implementations.ssl_channel_credentials(
resources.test_root_certificates())
channel = test_utilities.not_really_secure_channel(
'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE)
stub_options = implementations.stub_options(
request_serializers=serialization_behaviors.request_serializers,
response_deserializers=serialization_behaviors.
response_deserializers,
thread_pool_size=test_constants.POOL_SIZE)
generic_stub = implementations.generic_stub(
channel, options=stub_options)
dynamic_stub = implementations.dynamic_stub(
channel, service, cardinalities, options=stub_options)
return generic_stub, {service: dynamic_stub}, server
def destantiate(self, memo):
memo.stop(test_constants.SHORT_TIMEOUT).wait()
def invocation_metadata(self):
return grpc_test_common.INVOCATION_INITIAL_METADATA
def initial_metadata(self):
return grpc_test_common.SERVICE_INITIAL_METADATA
def terminal_metadata(self):
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
return interfaces.StatusCode.OK
def details(self):
return grpc_test_common.DETAILS
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
original_metadata, transmitted_metadata)
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -1,13 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -1,21 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A test constant working around issue 3069."""
# test_constants is referenced from specification in this module.
from tests.unit.framework.common import test_constants # pylint: disable=unused-import
# TODO(issue 3069): Replace uses of this constant with
# test_constants.SHORT_TIMEOUT.
REALLY_SHORT_TIMEOUT = 0.1

@ -1,13 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -1,287 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test code for the Face layer of RPC Framework."""
from __future__ import division
import abc
import itertools
import unittest
from concurrent import futures
import six
# test_interfaces is referenced from specification in this module.
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
from tests.unit.framework.common import test_coverage
from tests.unit.framework.interfaces.face import _3069_test_constant
from tests.unit.framework.interfaces.face import _digest
from tests.unit.framework.interfaces.face import _stock_service
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
class TestCase(
six.with_metaclass(abc.ABCMeta, test_coverage.Coverage,
unittest.TestCase)):
"""A test of the Face layer of RPC Framework.
Concrete subclasses must have an "implementation" attribute of type
test_interfaces.Implementation and an "invoker_constructor" attribute of type
_invocation.InvokerConstructor.
"""
NAME = 'BlockingInvocationInlineServiceTest'
def setUp(self):
"""See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
self._control = test_control.PauseFailControl()
self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE,
self._control, None)
generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
self._digest.methods, self._digest.inline_method_implementations,
None)
self._invoker = self.invoker_constructor.construct_invoker(
generic_stub, dynamic_stubs, self._digest.methods)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
self._invoker = None
self.implementation.destantiate(self._memo)
def testSuccessfulUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
response, call = self._invoker.blocking(group, method)(
request, test_constants.LONG_TIMEOUT, with_call=True)
test_messages.verify(request, response, self)
def testSuccessfulUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
response_iterator = self._invoker.blocking(group, method)(
request, test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
def testSuccessfulStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
response, call = self._invoker.blocking(group, method)(
iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
test_messages.verify(requests, response, self)
def testSuccessfulStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
response_iterator = self._invoker.blocking(group, method)(
iter(requests), test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
def testSequentialInvocations(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_response = self._invoker.blocking(group, method)(
first_request, test_constants.LONG_TIMEOUT)
test_messages.verify(first_request, first_response, self)
second_response = self._invoker.blocking(group, method)(
second_request, test_constants.LONG_TIMEOUT)
test_messages.verify(second_request, second_response, self)
def testParallelInvocations(self):
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures = []
for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
test_constants.LONG_TIMEOUT)
requests.append(request)
response_futures.append(response_future)
responses = [
response_future.result()
for response_future in response_futures
]
for request, response in zip(requests, responses):
test_messages.verify(request, response, self)
pool.shutdown(wait=True)
def testWaitingForSomeButNotAllParallelInvocations(self):
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
test_constants.LONG_TIMEOUT)
requests.append(request)
response_futures_to_indices[response_future] = index
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index],
response_future.result(), self)
pool.shutdown(wait=True)
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledUnaryRequestStreamResponse(self):
raise NotImplementedError()
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledStreamRequestUnaryResponse(self):
raise NotImplementedError()
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledStreamRequestStreamResponse(self):
raise NotImplementedError()
def testExpiredUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self._control.pause(), self.assertRaises(
face.ExpirationError):
self._invoker.blocking(group, method)(
request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self._control.pause(), self.assertRaises(
face.ExpirationError):
response_iterator = self._invoker.blocking(group, method)(
request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self._control.pause(), self.assertRaises(
face.ExpirationError):
self._invoker.blocking(
group, method)(iter(requests),
_3069_test_constant.REALLY_SHORT_TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self._control.pause(), self.assertRaises(
face.ExpirationError):
response_iterator = self._invoker.blocking(
group, method)(iter(requests),
_3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self._control.fail(), self.assertRaises(face.RemoteError):
self._invoker.blocking(group, method)(
request, test_constants.LONG_TIMEOUT)
def testFailedUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self._control.fail(), self.assertRaises(face.RemoteError):
response_iterator = self._invoker.blocking(group, method)(
request, test_constants.LONG_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self._control.fail(), self.assertRaises(face.RemoteError):
self._invoker.blocking(group, method)(
iter(requests), test_constants.LONG_TIMEOUT)
def testFailedStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self._control.fail(), self.assertRaises(face.RemoteError):
response_iterator = self._invoker.blocking(group, method)(
iter(requests), test_constants.LONG_TIMEOUT)
list(response_iterator)

@ -1,432 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code for making a service.TestService more amenable to use in tests."""
import collections
import threading
import six
# test_control, _service, and test_interfaces are referenced from specification
# in this module.
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.foundation import stream
from grpc.framework.foundation import stream_util
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_control # pylint: disable=unused-import
from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
_IDENTITY = lambda x: x
class TestServiceDigest(
collections.namedtuple('TestServiceDigest', (
'methods',
'inline_method_implementations',
'event_method_implementations',
'multi_method_implementation',
'unary_unary_messages_sequences',
'unary_stream_messages_sequences',
'stream_unary_messages_sequences',
'stream_stream_messages_sequences',
))):
"""A transformation of a service.TestService.
Attributes:
methods: A dict from method group-name pair to test_interfaces.Method object
describing the RPC methods that may be called during the test.
inline_method_implementations: A dict from method group-name pair to
face.MethodImplementation object to be used in tests of in-line calls to
behaviors under test.
event_method_implementations: A dict from method group-name pair to
face.MethodImplementation object to be used in tests of event-driven calls
to behaviors under test.
multi_method_implementation: A face.MultiMethodImplementation to be used in
tests of generic calls to behaviors under test.
unary_unary_messages_sequences: A dict from method group-name pair to
sequence of service.UnaryUnaryTestMessages objects to be used to test the
identified method.
unary_stream_messages_sequences: A dict from method group-name pair to
sequence of service.UnaryStreamTestMessages objects to be used to test the
identified method.
stream_unary_messages_sequences: A dict from method group-name pair to
sequence of service.StreamUnaryTestMessages objects to be used to test the
identified method.
stream_stream_messages_sequences: A dict from method group-name pair to
sequence of service.StreamStreamTestMessages objects to be used to test
the identified method.
"""
class _BufferingConsumer(stream.Consumer):
"""A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
def __init__(self):
self.consumed = []
self.terminated = False
def consume(self, value):
self.consumed.append(value)
def terminate(self):
self.terminated = True
def consume_and_terminate(self, value):
self.consumed.append(value)
self.terminated = True
class _InlineUnaryUnaryMethod(face.MethodImplementation):
def __init__(self, unary_unary_test_method, control):
self._test_method = unary_unary_test_method
self._control = control
self.cardinality = cardinality.Cardinality.UNARY_UNARY
self.style = style.Service.INLINE
def unary_unary_inline(self, request, context):
response_list = []
self._test_method.service(request, response_list.append, context,
self._control)
return response_list.pop(0)
class _EventUnaryUnaryMethod(face.MethodImplementation):
def __init__(self, unary_unary_test_method, control, pool):
self._test_method = unary_unary_test_method
self._control = control
self._pool = pool
self.cardinality = cardinality.Cardinality.UNARY_UNARY
self.style = style.Service.EVENT
def unary_unary_event(self, request, response_callback, context):
if self._pool is None:
self._test_method.service(request, response_callback, context,
self._control)
else:
self._pool.submit(self._test_method.service, request,
response_callback, context, self._control)
class _InlineUnaryStreamMethod(face.MethodImplementation):
def __init__(self, unary_stream_test_method, control):
self._test_method = unary_stream_test_method
self._control = control
self.cardinality = cardinality.Cardinality.UNARY_STREAM
self.style = style.Service.INLINE
def unary_stream_inline(self, request, context):
response_consumer = _BufferingConsumer()
self._test_method.service(request, response_consumer, context,
self._control)
for response in response_consumer.consumed:
yield response
class _EventUnaryStreamMethod(face.MethodImplementation):
def __init__(self, unary_stream_test_method, control, pool):
self._test_method = unary_stream_test_method
self._control = control
self._pool = pool
self.cardinality = cardinality.Cardinality.UNARY_STREAM
self.style = style.Service.EVENT
def unary_stream_event(self, request, response_consumer, context):
if self._pool is None:
self._test_method.service(request, response_consumer, context,
self._control)
else:
self._pool.submit(self._test_method.service, request,
response_consumer, context, self._control)
class _InlineStreamUnaryMethod(face.MethodImplementation):
def __init__(self, stream_unary_test_method, control):
self._test_method = stream_unary_test_method
self._control = control
self.cardinality = cardinality.Cardinality.STREAM_UNARY
self.style = style.Service.INLINE
def stream_unary_inline(self, request_iterator, context):
response_list = []
request_consumer = self._test_method.service(response_list.append,
context, self._control)
for request in request_iterator:
request_consumer.consume(request)
request_consumer.terminate()
return response_list.pop(0)
class _EventStreamUnaryMethod(face.MethodImplementation):
def __init__(self, stream_unary_test_method, control, pool):
self._test_method = stream_unary_test_method
self._control = control
self._pool = pool
self.cardinality = cardinality.Cardinality.STREAM_UNARY
self.style = style.Service.EVENT
def stream_unary_event(self, response_callback, context):
request_consumer = self._test_method.service(response_callback, context,
self._control)
if self._pool is None:
return request_consumer
else:
return stream_util.ThreadSwitchingConsumer(request_consumer,
self._pool)
class _InlineStreamStreamMethod(face.MethodImplementation):
def __init__(self, stream_stream_test_method, control):
self._test_method = stream_stream_test_method
self._control = control
self.cardinality = cardinality.Cardinality.STREAM_STREAM
self.style = style.Service.INLINE
def stream_stream_inline(self, request_iterator, context):
response_consumer = _BufferingConsumer()
request_consumer = self._test_method.service(response_consumer, context,
self._control)
for request in request_iterator:
request_consumer.consume(request)
while response_consumer.consumed:
yield response_consumer.consumed.pop(0)
response_consumer.terminate()
class _EventStreamStreamMethod(face.MethodImplementation):
def __init__(self, stream_stream_test_method, control, pool):
self._test_method = stream_stream_test_method
self._control = control
self._pool = pool
self.cardinality = cardinality.Cardinality.STREAM_STREAM
self.style = style.Service.EVENT
def stream_stream_event(self, response_consumer, context):
request_consumer = self._test_method.service(response_consumer, context,
self._control)
if self._pool is None:
return request_consumer
else:
return stream_util.ThreadSwitchingConsumer(request_consumer,
self._pool)
class _UnaryConsumer(stream.Consumer):
"""A Consumer that only allows consumption of exactly one value."""
def __init__(self, action):
self._lock = threading.Lock()
self._action = action
self._consumed = False
self._terminated = False
def consume(self, value):
with self._lock:
if self._consumed:
raise ValueError('Unary consumer already consumed!')
elif self._terminated:
raise ValueError('Unary consumer already terminated!')
else:
self._consumed = True
self._action(value)
def terminate(self):
with self._lock:
if not self._consumed:
raise ValueError('Unary consumer hasn\'t yet consumed!')
elif self._terminated:
raise ValueError('Unary consumer already terminated!')
else:
self._terminated = True
def consume_and_terminate(self, value):
with self._lock:
if self._consumed:
raise ValueError('Unary consumer already consumed!')
elif self._terminated:
raise ValueError('Unary consumer already terminated!')
else:
self._consumed = True
self._terminated = True
self._action(value)
class _UnaryUnaryAdaptation(object):
def __init__(self, unary_unary_test_method):
self._method = unary_unary_test_method
def service(self, response_consumer, context, control):
def action(request):
self._method.service(request,
response_consumer.consume_and_terminate,
context, control)
return _UnaryConsumer(action)
class _UnaryStreamAdaptation(object):
def __init__(self, unary_stream_test_method):
self._method = unary_stream_test_method
def service(self, response_consumer, context, control):
def action(request):
self._method.service(request, response_consumer, context, control)
return _UnaryConsumer(action)
class _StreamUnaryAdaptation(object):
def __init__(self, stream_unary_test_method):
self._method = stream_unary_test_method
def service(self, response_consumer, context, control):
return self._method.service(response_consumer.consume_and_terminate,
context, control)
class _MultiMethodImplementation(face.MultiMethodImplementation):
def __init__(self, methods, control, pool):
self._methods = methods
self._control = control
self._pool = pool
def service(self, group, name, response_consumer, context):
method = self._methods.get(group, name, None)
if method is None:
raise face.NoSuchMethodError(group, name)
elif self._pool is None:
return method(response_consumer, context, self._control)
else:
request_consumer = method(response_consumer, context, self._control)
return stream_util.ThreadSwitchingConsumer(request_consumer,
self._pool)
class _Assembly(
collections.namedtuple(
'_Assembly',
['methods', 'inlines', 'events', 'adaptations', 'messages'])):
"""An intermediate structure created when creating a TestServiceDigest."""
def _assemble(scenarios, identifiers, inline_method_constructor,
event_method_constructor, adapter, control, pool):
"""Creates an _Assembly from the given scenarios."""
methods = {}
inlines = {}
events = {}
adaptations = {}
messages = {}
for identifier, scenario in six.iteritems(scenarios):
if identifier in identifiers:
raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
test_method = scenario[0]
inline_method = inline_method_constructor(test_method, control)
event_method = event_method_constructor(test_method, control, pool)
adaptation = adapter(test_method)
methods[identifier] = test_method
inlines[identifier] = inline_method
events[identifier] = event_method
adaptations[identifier] = adaptation
messages[identifier] = scenario[1]
return _Assembly(methods, inlines, events, adaptations, messages)
def digest(service, control, pool):
"""Creates a TestServiceDigest from a TestService.
Args:
service: A _service.TestService.
control: A test_control.Control.
pool: If RPC methods should be serviced in a separate thread, a thread pool.
None if RPC methods should be serviced in the thread belonging to the
run-time that calls for their service.
Returns:
A TestServiceDigest synthesized from the given service.TestService.
"""
identifiers = set()
unary_unary = _assemble(service.unary_unary_scenarios(), identifiers,
_InlineUnaryUnaryMethod, _EventUnaryUnaryMethod,
_UnaryUnaryAdaptation, control, pool)
identifiers.update(unary_unary.inlines)
unary_stream = _assemble(service.unary_stream_scenarios(), identifiers,
_InlineUnaryStreamMethod, _EventUnaryStreamMethod,
_UnaryStreamAdaptation, control, pool)
identifiers.update(unary_stream.inlines)
stream_unary = _assemble(service.stream_unary_scenarios(), identifiers,
_InlineStreamUnaryMethod, _EventStreamUnaryMethod,
_StreamUnaryAdaptation, control, pool)
identifiers.update(stream_unary.inlines)
stream_stream = _assemble(service.stream_stream_scenarios(), identifiers,
_InlineStreamStreamMethod,
_EventStreamStreamMethod, _IDENTITY, control,
pool)
identifiers.update(stream_stream.inlines)
methods = dict(unary_unary.methods)
methods.update(unary_stream.methods)
methods.update(stream_unary.methods)
methods.update(stream_stream.methods)
adaptations = dict(unary_unary.adaptations)
adaptations.update(unary_stream.adaptations)
adaptations.update(stream_unary.adaptations)
adaptations.update(stream_stream.adaptations)
inlines = dict(unary_unary.inlines)
inlines.update(unary_stream.inlines)
inlines.update(stream_unary.inlines)
inlines.update(stream_stream.inlines)
events = dict(unary_unary.events)
events.update(unary_stream.events)
events.update(stream_unary.events)
events.update(stream_stream.events)
return TestServiceDigest(methods, inlines, events,
_MultiMethodImplementation(adaptations, control,
pool),
unary_unary.messages, unary_stream.messages,
stream_unary.messages, stream_stream.messages)

@ -1,508 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test code for the Face layer of RPC Framework."""
from __future__ import division
import abc
import contextlib
import itertools
import threading
import unittest
from concurrent import futures
import six
# test_interfaces is referenced from specification in this module.
from grpc.framework.foundation import future
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
from tests.unit.framework.common import test_coverage
from tests.unit.framework.interfaces.face import _3069_test_constant
from tests.unit.framework.interfaces.face import _digest
from tests.unit.framework.interfaces.face import _stock_service
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
class _PauseableIterator(object):
def __init__(self, upstream):
self._upstream = upstream
self._condition = threading.Condition()
self._paused = False
@contextlib.contextmanager
def pause(self):
with self._condition:
self._paused = True
yield
with self._condition:
self._paused = False
self._condition.notify_all()
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
with self._condition:
while self._paused:
self._condition.wait()
return next(self._upstream)
class _Callback(object):
def __init__(self):
self._condition = threading.Condition()
self._called = False
self._passed_future = None
self._passed_other_stuff = None
def __call__(self, *args, **kwargs):
with self._condition:
self._called = True
if args:
self._passed_future = args[0]
if 1 < len(args) or kwargs:
self._passed_other_stuff = tuple(args[1:]), dict(kwargs)
self._condition.notify_all()
def future(self):
with self._condition:
while True:
if self._passed_other_stuff is not None:
raise ValueError(
'Test callback passed unexpected values: %s',
self._passed_other_stuff)
elif self._called:
return self._passed_future
else:
self._condition.wait()
class TestCase(
six.with_metaclass(abc.ABCMeta, test_coverage.Coverage,
unittest.TestCase)):
"""A test of the Face layer of RPC Framework.
Concrete subclasses must have an "implementation" attribute of type
test_interfaces.Implementation and an "invoker_constructor" attribute of type
_invocation.InvokerConstructor.
"""
NAME = 'FutureInvocationAsynchronousEventServiceTest'
def setUp(self):
"""See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
self._control = test_control.PauseFailControl()
self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE,
self._control, self._digest_pool)
generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
self._digest.methods, self._digest.event_method_implementations,
None)
self._invoker = self.invoker_constructor.construct_invoker(
generic_stub, dynamic_stubs, self._digest.methods)
def tearDown(self):
"""See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
self._invoker = None
self.implementation.destantiate(self._memo)
self._digest_pool.shutdown(wait=True)
def testSuccessfulUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = _Callback()
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
response_future.add_done_callback(callback)
response = response_future.result()
test_messages.verify(request, response, self)
self.assertIs(callback.future(), response_future)
self.assertIsNone(response_future.exception())
self.assertIsNone(response_future.traceback())
def testSuccessfulUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
response_iterator = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
def testSuccessfulStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
request_iterator = _PauseableIterator(iter(requests))
callback = _Callback()
# Use of a paused iterator of requests allows us to test that control is
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_future = self._invoker.future(group, method)(
request_iterator, test_constants.LONG_TIMEOUT)
response_future.add_done_callback(callback)
future_passed_to_callback = callback.future()
response = future_passed_to_callback.result()
test_messages.verify(requests, response, self)
self.assertIs(future_passed_to_callback, response_future)
self.assertIsNone(response_future.exception())
self.assertIsNone(response_future.traceback())
def testSuccessfulStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
request_iterator = _PauseableIterator(iter(requests))
# Use of a paused iterator of requests allows us to test that control is
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_iterator = self._invoker.future(group, method)(
request_iterator, test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
def testSequentialInvocations(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_response_future = self._invoker.future(group, method)(
first_request, test_constants.LONG_TIMEOUT)
first_response = first_response_future.result()
test_messages.verify(first_request, first_response, self)
second_response_future = self._invoker.future(group, method)(
second_request, test_constants.LONG_TIMEOUT)
second_response = second_response_future.result()
test_messages.verify(second_request, second_response, self)
def testParallelInvocations(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
first_request = test_messages.request()
second_request = test_messages.request()
first_response_future = self._invoker.future(group, method)(
first_request, test_constants.LONG_TIMEOUT)
second_response_future = self._invoker.future(group, method)(
second_request, test_constants.LONG_TIMEOUT)
first_response = first_response_future.result()
second_response = second_response_future.result()
test_messages.verify(first_request, first_response, self)
test_messages.verify(second_request, second_response, self)
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures = []
for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
requests.append(request)
response_futures.append(response_future)
responses = [
response_future.result()
for response_future in response_futures
]
for request, response in zip(requests, responses):
test_messages.verify(request, response, self)
def testWaitingForSomeButNotAllParallelInvocations(self):
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
inner_response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
outer_response_future = pool.submit(
inner_response_future.result)
requests.append(request)
response_futures_to_indices[outer_response_future] = index
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index],
response_future.result(), self)
pool.shutdown(wait=True)
def testCancelledUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = _Callback()
with self._control.pause():
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
response_future.add_done_callback(callback)
cancel_method_return_value = response_future.cancel()
self.assertIs(callback.future(), response_future)
self.assertFalse(cancel_method_return_value)
self.assertTrue(response_future.cancelled())
with self.assertRaises(future.CancelledError):
response_future.result()
with self.assertRaises(future.CancelledError):
response_future.exception()
with self.assertRaises(future.CancelledError):
response_future.traceback()
def testCancelledUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self._control.pause():
response_iterator = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(face.CancellationError):
next(response_iterator)
def testCancelledStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = _Callback()
with self._control.pause():
response_future = self._invoker.future(group, method)(
iter(requests), test_constants.LONG_TIMEOUT)
response_future.add_done_callback(callback)
cancel_method_return_value = response_future.cancel()
self.assertIs(callback.future(), response_future)
self.assertFalse(cancel_method_return_value)
self.assertTrue(response_future.cancelled())
with self.assertRaises(future.CancelledError):
response_future.result()
with self.assertRaises(future.CancelledError):
response_future.exception()
with self.assertRaises(future.CancelledError):
response_future.traceback()
def testCancelledStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self._control.pause():
response_iterator = self._invoker.future(group, method)(
iter(requests), test_constants.LONG_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(face.CancellationError):
next(response_iterator)
def testExpiredUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = _Callback()
with self._control.pause():
response_future = self._invoker.future(group, method)(
request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
response_future.add_done_callback(callback)
self.assertIs(callback.future(), response_future)
self.assertIsInstance(response_future.exception(),
face.ExpirationError)
with self.assertRaises(face.ExpirationError):
response_future.result()
self.assertIsInstance(response_future.exception(),
face.AbortionError)
self.assertIsNotNone(response_future.traceback())
def testExpiredUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
with self._control.pause():
response_iterator = self._invoker.future(group, method)(
request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = _Callback()
with self._control.pause():
response_future = self._invoker.future(
group, method)(iter(requests),
_3069_test_constant.REALLY_SHORT_TIMEOUT)
response_future.add_done_callback(callback)
self.assertIs(callback.future(), response_future)
self.assertIsInstance(response_future.exception(),
face.ExpirationError)
with self.assertRaises(face.ExpirationError):
response_future.result()
self.assertIsInstance(response_future.exception(),
face.AbortionError)
self.assertIsNotNone(response_future.traceback())
def testExpiredStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
with self._control.pause():
response_iterator = self._invoker.future(
group, method)(iter(requests),
_3069_test_constant.REALLY_SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
callback = _Callback()
abortion_callback = _Callback()
with self._control.fail():
response_future = self._invoker.future(group, method)(
request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
response_future.add_done_callback(callback)
response_future.add_abortion_callback(abortion_callback)
self.assertIs(callback.future(), response_future)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
# indistinguishable from simply not having called its
# response_callback before the expiration of the RPC.
self.assertIsInstance(response_future.exception(),
face.ExpirationError)
with self.assertRaises(face.ExpirationError):
response_future.result()
self.assertIsNotNone(response_future.traceback())
self.assertIsNotNone(abortion_callback.future())
def testFailedUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.unary_stream_messages_sequences)):
for test_messages in test_messages_sequence:
request = test_messages.request()
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable
# from simply not having called its response_consumer before the
# expiration of the RPC.
with self._control.fail(), self.assertRaises(
face.ExpirationError):
response_iterator = self._invoker.future(group, method)(
request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
callback = _Callback()
abortion_callback = _Callback()
with self._control.fail():
response_future = self._invoker.future(
group, method)(iter(requests),
_3069_test_constant.REALLY_SHORT_TIMEOUT)
response_future.add_done_callback(callback)
response_future.add_abortion_callback(abortion_callback)
self.assertIs(callback.future(), response_future)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
# indistinguishable from simply not having called its
# response_callback before the expiration of the RPC.
self.assertIsInstance(response_future.exception(),
face.ExpirationError)
with self.assertRaises(face.ExpirationError):
response_future.result()
self.assertIsNotNone(response_future.traceback())
self.assertIsNotNone(abortion_callback.future())
def testFailedStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (six.iteritems(
self._digest.stream_stream_messages_sequences)):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable
# from simply not having called its response_consumer before the
# expiration of the RPC.
with self._control.fail(), self.assertRaises(
face.ExpirationError):
response_iterator = self._invoker.future(
group, method)(iter(requests),
_3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)

@ -1,198 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Coverage across the Face layer's generic-to-dynamic range for invocation."""
import abc
import six
from grpc.framework.common import cardinality
_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = {
cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary',
cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary',
cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
}
_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = {
cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary',
cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary',
cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
}
_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = {
cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary',
cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream',
cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary',
cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream',
}
_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = {
cardinality.Cardinality.UNARY_UNARY: 'unary_unary',
cardinality.Cardinality.UNARY_STREAM: 'unary_stream',
cardinality.Cardinality.STREAM_UNARY: 'stream_unary',
cardinality.Cardinality.STREAM_STREAM: 'stream_stream',
}
class Invoker(six.with_metaclass(abc.ABCMeta)):
"""A type used to invoke test RPCs."""
@abc.abstractmethod
def blocking(self, group, name):
"""Invokes an RPC with blocking control flow."""
raise NotImplementedError()
@abc.abstractmethod
def future(self, group, name):
"""Invokes an RPC with future control flow."""
raise NotImplementedError()
@abc.abstractmethod
def event(self, group, name):
"""Invokes an RPC with event control flow."""
raise NotImplementedError()
class InvokerConstructor(six.with_metaclass(abc.ABCMeta)):
"""A type used to create Invokers."""
@abc.abstractmethod
def name(self):
"""Specifies the name of the Invoker constructed by this object."""
raise NotImplementedError()
@abc.abstractmethod
def construct_invoker(self, generic_stub, dynamic_stubs, methods):
"""Constructs an Invoker for the given stubs and methods."""
raise NotImplementedError()
class _GenericInvoker(Invoker):
def __init__(self, generic_stub, methods):
self._stub = generic_stub
self._methods = methods
def _behavior(self, group, name, cardinality_to_generic_method):
method_cardinality = self._methods[group, name].cardinality()
behavior = getattr(self._stub,
cardinality_to_generic_method[method_cardinality])
return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
def blocking(self, group, name):
return self._behavior(group, name,
_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
def future(self, group, name):
return self._behavior(group, name,
_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
def event(self, group, name):
return self._behavior(group, name,
_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
class _GenericInvokerConstructor(InvokerConstructor):
def name(self):
return 'GenericInvoker'
def construct_invoker(self, generic_stub, dynamic_stub, methods):
return _GenericInvoker(generic_stub, methods)
class _MultiCallableInvoker(Invoker):
def __init__(self, generic_stub, methods):
self._stub = generic_stub
self._methods = methods
def _multi_callable(self, group, name):
method_cardinality = self._methods[group, name].cardinality()
behavior = getattr(
self._stub,
_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
return behavior(group, name)
def blocking(self, group, name):
return self._multi_callable(group, name)
def future(self, group, name):
method_cardinality = self._methods[group, name].cardinality()
behavior = getattr(
self._stub,
_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
if method_cardinality in (cardinality.Cardinality.UNARY_UNARY,
cardinality.Cardinality.STREAM_UNARY):
return behavior(group, name).future
else:
return behavior(group, name)
def event(self, group, name):
return self._multi_callable(group, name).event
class _MultiCallableInvokerConstructor(InvokerConstructor):
def name(self):
return 'MultiCallableInvoker'
def construct_invoker(self, generic_stub, dynamic_stub, methods):
return _MultiCallableInvoker(generic_stub, methods)
class _DynamicInvoker(Invoker):
def __init__(self, dynamic_stubs, methods):
self._stubs = dynamic_stubs
self._methods = methods
def blocking(self, group, name):
return getattr(self._stubs[group], name)
def future(self, group, name):
if self._methods[group, name].cardinality() in (
cardinality.Cardinality.UNARY_UNARY,
cardinality.Cardinality.STREAM_UNARY):
return getattr(self._stubs[group], name).future
else:
return getattr(self._stubs[group], name)
def event(self, group, name):
return getattr(self._stubs[group], name).event
class _DynamicInvokerConstructor(InvokerConstructor):
def name(self):
return 'DynamicInvoker'
def construct_invoker(self, generic_stub, dynamic_stubs, methods):
return _DynamicInvoker(dynamic_stubs, methods)
def invoker_constructors():
"""Creates a sequence of InvokerConstructors to use in tests of RPCs.
Returns:
A sequence of InvokerConstructors.
"""
return (
_GenericInvokerConstructor(),
_MultiCallableInvokerConstructor(),
_DynamicInvokerConstructor(),
)

@ -1,304 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Private interfaces implemented by data sets used in Face-layer tests."""
import abc
import six
# face is referenced from specification in this module.
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
from tests.unit.framework.interfaces.face import test_interfaces
class UnaryUnaryTestMethodImplementation(
six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
"""A controllable implementation of a unary-unary method."""
@abc.abstractmethod
def service(self, request, response_callback, context, control):
"""Services an RPC that accepts one message and produces one message.
Args:
request: The single request message for the RPC.
response_callback: A callback to be called to accept the response message
of the RPC.
context: An face.ServicerContext object.
control: A test_control.Control to control execution of this method.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
"""A type for unary-request-unary-response message pairings."""
@abc.abstractmethod
def request(self):
"""Affords a request message.
Implementations of this method should return a different message with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A request message.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, request, response, test_case):
"""Verifies that the computed response matches the given request.
Args:
request: A request message.
response: A response message.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the request and response do not match, indicating that
there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class UnaryStreamTestMethodImplementation(
six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
"""A controllable implementation of a unary-stream method."""
@abc.abstractmethod
def service(self, request, response_consumer, context, control):
"""Services an RPC that takes one message and produces a stream of messages.
Args:
request: The single request message for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
messages of the RPC.
context: A face.ServicerContext object.
control: A test_control.Control to control execution of this method.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
"""A type for unary-request-stream-response message pairings."""
@abc.abstractmethod
def request(self):
"""Affords a request message.
Implementations of this method should return a different message with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A request message.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, request, responses, test_case):
"""Verifies that the computed responses match the given request.
Args:
request: A request message.
responses: A sequence of response messages.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the request and responses do not match, indicating that
there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class StreamUnaryTestMethodImplementation(
six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
"""A controllable implementation of a stream-unary method."""
@abc.abstractmethod
def service(self, response_callback, context, control):
"""Services an RPC that takes a stream of messages and produces one message.
Args:
response_callback: A callback to be called to accept the response message
of the RPC.
context: A face.ServicerContext object.
control: A test_control.Control to control execution of this method.
Returns:
A stream.Consumer with which to accept the request messages of the RPC.
The consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing messages to this object. Implementations must not assume that
this object will be called to completion of the request stream or even
called at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
"""A type for stream-request-unary-response message pairings."""
@abc.abstractmethod
def requests(self):
"""Affords a sequence of request messages.
Implementations of this method should return a different sequences with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A sequence of request messages.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, requests, response, test_case):
"""Verifies that the computed response matches the given requests.
Args:
requests: A sequence of request messages.
response: A response message.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the requests and response do not match, indicating that
there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class StreamStreamTestMethodImplementation(
six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
"""A controllable implementation of a stream-stream method."""
@abc.abstractmethod
def service(self, response_consumer, context, control):
"""Services an RPC that accepts and produces streams of messages.
Args:
response_consumer: A stream.Consumer to be called to accept the response
messages of the RPC.
context: A face.ServicerContext object.
control: A test_control.Control to control execution of this method.
Returns:
A stream.Consumer with which to accept the request messages of the RPC.
The consumer returned from this method may or may not be invoked to
completion: in the case of RPC abortion, RPC Framework will simply stop
passing messages to this object. Implementations must not assume that
this object will be called to completion of the request stream or even
called at all.
Raises:
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
raise NotImplementedError()
class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
"""A type for stream-request-stream-response message pairings."""
@abc.abstractmethod
def requests(self):
"""Affords a sequence of request messages.
Implementations of this method should return a different sequences with each
call so that multiple test executions of the test method may be made with
different inputs.
Returns:
A sequence of request messages.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify(self, requests, responses, test_case):
"""Verifies that the computed response matches the given requests.
Args:
requests: A sequence of request messages.
responses: A sequence of response messages.
test_case: A unittest.TestCase object affording useful assertion methods.
Raises:
AssertionError: If the requests and responses do not match, indicating
that there was some problem executing the RPC under test.
"""
raise NotImplementedError()
class TestService(six.with_metaclass(abc.ABCMeta)):
"""A specification of implemented methods to use in tests."""
@abc.abstractmethod
def unary_unary_scenarios(self):
"""Affords unary-request-unary-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
first element of the pair is a UnaryUnaryTestMethodImplementation object
and the second element is a sequence of UnaryUnaryTestMethodMessages
objects.
"""
raise NotImplementedError()
@abc.abstractmethod
def unary_stream_scenarios(self):
"""Affords unary-request-stream-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
first element of the pair is a UnaryStreamTestMethodImplementation
object and the second element is a sequence of
UnaryStreamTestMethodMessages objects.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_unary_scenarios(self):
"""Affords stream-request-unary-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
first element of the pair is a StreamUnaryTestMethodImplementation
object and the second element is a sequence of
StreamUnaryTestMethodMessages objects.
"""
raise NotImplementedError()
@abc.abstractmethod
def stream_stream_scenarios(self):
"""Affords stream-request-stream-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
first element of the pair is a StreamStreamTestMethodImplementation
object and the second element is a sequence of
StreamStreamTestMethodMessages objects.
"""
raise NotImplementedError()

@ -1,390 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Examples of Python implementations of the stock.proto Stock service."""
from grpc.framework.common import cardinality
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import stream
from tests.unit.framework.common import test_constants
from tests.unit.framework.interfaces.face import _service
from tests.unit._junkdrawer import stock_pb2
_STOCK_GROUP_NAME = 'Stock'
_SYMBOL_FORMAT = 'test symbol:%03d'
# A test-appropriate security-pricing function. :-P
_price = lambda symbol_name: float(hash(symbol_name) % 4096)
def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
"""A unary-request, unary-response test method."""
control.control()
if active():
stock_reply_callback(
stock_pb2.StockReply(
symbol=stock_request.symbol,
price=_price(stock_request.symbol)))
else:
raise abandonment.Abandoned()
def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
"""A stream-request, stream-response test method."""
def stock_reply_for_stock_request(stock_request):
control.control()
if active():
return stock_pb2.StockReply(
symbol=stock_request.symbol, price=_price(stock_request.symbol))
else:
raise abandonment.Abandoned()
class StockRequestConsumer(stream.Consumer):
def consume(self, stock_request):
stock_reply_consumer.consume(
stock_reply_for_stock_request(stock_request))
def terminate(self):
control.control()
stock_reply_consumer.terminate()
def consume_and_terminate(self, stock_request):
stock_reply_consumer.consume_and_terminate(
stock_reply_for_stock_request(stock_request))
return StockRequestConsumer()
def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
"""A unary-request, stream-response test method."""
base_price = _price(stock_request.symbol)
for index in range(stock_request.num_trades_to_watch):
control.control()
if active():
stock_reply_consumer.consume(
stock_pb2.StockReply(
symbol=stock_request.symbol, price=base_price + index))
else:
raise abandonment.Abandoned()
stock_reply_consumer.terminate()
def _get_highest_trade_price(stock_reply_callback, control, active):
"""A stream-request, unary-response test method."""
class StockRequestConsumer(stream.Consumer):
"""Keeps an ongoing record of the most valuable symbol yet consumed."""
def __init__(self):
self._symbol = None
self._price = None
def consume(self, stock_request):
control.control()
if active():
if self._price is None:
self._symbol = stock_request.symbol
self._price = _price(stock_request.symbol)
else:
candidate_price = _price(stock_request.symbol)
if self._price < candidate_price:
self._symbol = stock_request.symbol
self._price = candidate_price
def terminate(self):
control.control()
if active():
if self._symbol is None:
raise ValueError()
else:
stock_reply_callback(
stock_pb2.StockReply(
symbol=self._symbol, price=self._price))
self._symbol = None
self._price = None
def consume_and_terminate(self, stock_request):
control.control()
if active():
if self._price is None:
stock_reply_callback(
stock_pb2.StockReply(
symbol=stock_request.symbol,
price=_price(stock_request.symbol)))
else:
candidate_price = _price(stock_request.symbol)
if self._price < candidate_price:
stock_reply_callback(
stock_pb2.StockReply(
symbol=stock_request.symbol,
price=candidate_price))
else:
stock_reply_callback(
stock_pb2.StockReply(
symbol=self._symbol, price=self._price))
self._symbol = None
self._price = None
return StockRequestConsumer()
class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation):
"""GetLastTradePrice for use in tests."""
def group(self):
return _STOCK_GROUP_NAME
def name(self):
return 'GetLastTradePrice'
def cardinality(self):
return cardinality.Cardinality.UNARY_UNARY
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, request, response_callback, context, control):
_get_last_trade_price(request, response_callback, control,
context.is_active)
class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages):
def __init__(self):
self._index = 0
def request(self):
symbol = _SYMBOL_FORMAT % self._index
self._index += 1
return stock_pb2.StockRequest(symbol=symbol)
def verify(self, request, response, test_case):
test_case.assertEqual(request.symbol, response.symbol)
test_case.assertEqual(_price(request.symbol), response.price)
class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation):
"""GetLastTradePriceMultiple for use in tests."""
def group(self):
return _STOCK_GROUP_NAME
def name(self):
return 'GetLastTradePriceMultiple'
def cardinality(self):
return cardinality.Cardinality.STREAM_STREAM
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, response_consumer, context, control):
return _get_last_trade_price_multiple(response_consumer, control,
context.is_active)
class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages):
"""Pairs of message streams for use with GetLastTradePriceMultiple."""
def __init__(self):
self._index = 0
def requests(self):
base_index = self._index
self._index += 1
return [
stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
for index in range(test_constants.STREAM_LENGTH)
]
def verify(self, requests, responses, test_case):
test_case.assertEqual(len(requests), len(responses))
for stock_request, stock_reply in zip(requests, responses):
test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
test_case.assertEqual(
_price(stock_request.symbol), stock_reply.price)
class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation):
"""WatchFutureTrades for use in tests."""
def group(self):
return _STOCK_GROUP_NAME
def name(self):
return 'WatchFutureTrades'
def cardinality(self):
return cardinality.Cardinality.UNARY_STREAM
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, request, response_consumer, context, control):
_watch_future_trades(request, response_consumer, control,
context.is_active)
class WatchFutureTradesMessages(_service.UnaryStreamTestMessages):
"""Pairs of a single request message and a sequence of response messages."""
def __init__(self):
self._index = 0
def request(self):
symbol = _SYMBOL_FORMAT % self._index
self._index += 1
return stock_pb2.StockRequest(
symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
def verify(self, request, responses, test_case):
test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
base_price = _price(request.symbol)
for index, response in enumerate(responses):
test_case.assertEqual(base_price + index, response.price)
class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation):
"""GetHighestTradePrice for use in tests."""
def group(self):
return _STOCK_GROUP_NAME
def name(self):
return 'GetHighestTradePrice'
def cardinality(self):
return cardinality.Cardinality.STREAM_UNARY
def request_class(self):
return stock_pb2.StockRequest
def response_class(self):
return stock_pb2.StockReply
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, serialized_request):
return stock_pb2.StockRequest.FromString(serialized_request)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, serialized_response):
return stock_pb2.StockReply.FromString(serialized_response)
def service(self, response_callback, context, control):
return _get_highest_trade_price(response_callback, control,
context.is_active)
class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages):
def requests(self):
return [
stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
for index in range(test_constants.STREAM_LENGTH)
]
def verify(self, requests, response, test_case):
price = None
symbol = None
for stock_request in requests:
current_symbol = stock_request.symbol
current_price = _price(current_symbol)
if price is None or price < current_price:
price = current_price
symbol = current_symbol
test_case.assertEqual(price, response.price)
test_case.assertEqual(symbol, response.symbol)
class StockTestService(_service.TestService):
"""A corpus of test data with one method of each RPC cardinality."""
def unary_unary_scenarios(self):
return {
(_STOCK_GROUP_NAME, 'GetLastTradePrice'):
(GetLastTradePrice(), [GetLastTradePriceMessages()]),
}
def unary_stream_scenarios(self):
return {
(_STOCK_GROUP_NAME, 'WatchFutureTrades'):
(WatchFutureTrades(), [WatchFutureTradesMessages()]),
}
def stream_unary_scenarios(self):
return {
(_STOCK_GROUP_NAME, 'GetHighestTradePrice'):
(GetHighestTradePrice(), [GetHighestTradePriceMessages()])
}
def stream_stream_scenarios(self):
return {
(_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'):
(GetLastTradePriceMultiple(),
[GetLastTradePriceMultipleMessages()]),
}
STOCK_TEST_SERVICE = StockTestService()

@ -1,53 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for creating tests of implementations of the Face layer."""
# unittest is referenced from specification in this module.
import unittest # pylint: disable=unused-import
# test_interfaces is referenced from specification in this module.
from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service
from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service
from tests.unit.framework.interfaces.face import _invocation
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
_TEST_CASE_SUPERCLASSES = (
_blocking_invocation_inline_service.TestCase,
_future_invocation_asynchronous_event_service.TestCase,
)
def test_cases(implementation):
"""Creates unittest.TestCase classes for a given Face layer implementation.
Args:
implementation: A test_interfaces.Implementation specifying creation and
destruction of a given Face layer implementation.
Returns:
A sequence of subclasses of unittest.TestCase defining tests of the
specified Face layer implementation.
"""
test_case_classes = []
for invoker_constructor in _invocation.invoker_constructors():
for super_class in _TEST_CASE_SUPERCLASSES:
test_case_classes.append(
type(
invoker_constructor.name() + super_class.NAME,
(super_class,), {
'implementation': implementation,
'invoker_constructor': invoker_constructor,
'__module__': implementation.__module__,
}))
return test_case_classes

@ -1,212 +0,0 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interfaces used in tests of implementations of the Face layer."""
import abc
import six
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
class Method(six.with_metaclass(abc.ABCMeta)):
"""Specifies a method to be used in tests."""
@abc.abstractmethod
def group(self):
"""Identify the group of the method.
Returns:
The group of the method.
"""
raise NotImplementedError()
@abc.abstractmethod
def name(self):
"""Identify the name of the method.
Returns:
The name of the method.
"""
raise NotImplementedError()
@abc.abstractmethod
def cardinality(self):
"""Identify the cardinality of the method.
Returns:
A cardinality.Cardinality value describing the streaming semantics of the
method.
"""
raise NotImplementedError()
@abc.abstractmethod
def request_class(self):
"""Identify the class used for the method's request objects.
Returns:
The class object of the class to which the method's request objects
belong.
"""
raise NotImplementedError()
@abc.abstractmethod
def response_class(self):
"""Identify the class used for the method's response objects.
Returns:
The class object of the class to which the method's response objects
belong.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_request(self, request):
"""Serialize the given request object.
Args:
request: A request object appropriate for this method.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_request(self, serialized_request):
"""Synthesize a request object from a given bytestring.
Args:
serialized_request: A bytestring deserializable into a request object
appropriate for this method.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_response(self, response):
"""Serialize the given response object.
Args:
response: A response object appropriate for this method.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_response(self, serialized_response):
"""Synthesize a response object from a given bytestring.
Args:
serialized_response: A bytestring deserializable into a response object
appropriate for this method.
"""
raise NotImplementedError()
class Implementation(six.with_metaclass(abc.ABCMeta)):
"""Specifies an implementation of the Face layer."""
@abc.abstractmethod
def instantiate(self, methods, method_implementations,
multi_method_implementation):
"""Instantiates the Face layer implementation to be used in a test.
Args:
methods: A sequence of Method objects describing the methods available to
be called during the test.
method_implementations: A dictionary from group-name pair to
face.MethodImplementation object specifying implementation of a method.
multi_method_implementation: A face.MultiMethodImplementation or None.
Returns:
A sequence of length three the first element of which is a
face.GenericStub, the second element of which is dictionary from groups
to face.DynamicStubs affording invocation of the group's methods, and
the third element of which is an arbitrary memo object to be kept and
passed to destantiate at the conclusion of the test. The returned stubs
must be backed by the provided implementations.
"""
raise NotImplementedError()
@abc.abstractmethod
def destantiate(self, memo):
"""Destroys the Face layer implementation under test.
Args:
memo: The object from the third position of the return value of a call to
instantiate.
"""
raise NotImplementedError()
@abc.abstractmethod
def invocation_metadata(self):
"""Provides the metadata to be used when invoking a test RPC.
Returns:
An object to use as the supplied-at-invocation-time metadata in a test
RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def initial_metadata(self):
"""Provides the metadata for use as a test RPC's first servicer metadata.
Returns:
An object to use as the from-the-servicer-before-responses metadata in a
test RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def terminal_metadata(self):
"""Provides the metadata for use as a test RPC's second servicer metadata.
Returns:
An object to use as the from-the-servicer-after-all-responses metadata in
a test RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def code(self):
"""Provides the value for use as a test RPC's code.
Returns:
An object to use as the from-the-servicer code in a test RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def details(self):
"""Provides the value for use as a test RPC's details.
Returns:
An object to use as the from-the-servicer details in a test RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def metadata_transmitted(self, original_metadata, transmitted_metadata):
"""Identifies whether or not metadata was properly transmitted.
Args:
original_metadata: A metadata value passed to the Face interface
implementation under test.
transmitted_metadata: The same metadata value after having been
transmitted via an RPC performed by the Face interface implementation
under test.
Returns:
Whether or not the metadata was properly transmitted by the Face interface
implementation under test.
"""
raise NotImplementedError()
Loading…
Cancel
Save