gRPC Python test infrastructure

(The channel-related second part of it.)
pull/11964/head
Nathaniel Manista 7 years ago
parent 69b7231776
commit 2010985ab2
  1. 3
      .pylintrc
  2. 289
      src/python/grpcio_testing/grpc_testing/__init__.py
  3. 23
      src/python/grpcio_testing/grpc_testing/_channel/__init__.py
  4. 62
      src/python/grpcio_testing/grpc_testing/_channel/_channel.py
  5. 119
      src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py
  6. 48
      src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py
  7. 322
      src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
  8. 115
      src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py
  9. 193
      src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py
  10. 92
      src/python/grpcio_testing/grpc_testing/_common.py
  11. 4
      src/python/grpcio_tests/setup.py
  12. 36
      src/python/grpcio_tests/tests/testing/_application_common.py
  13. 33
      src/python/grpcio_tests/tests/testing/_application_testing_common.py
  14. 260
      src/python/grpcio_tests/tests/testing/_client_application.py
  15. 306
      src/python/grpcio_tests/tests/testing/_client_test.py
  16. 13
      src/python/grpcio_tests/tests/testing/proto/__init__.py
  17. 29
      src/python/grpcio_tests/tests/testing/proto/requests.proto
  18. 42
      src/python/grpcio_tests/tests/testing/proto/services.proto
  19. 1
      src/python/grpcio_tests/tests/tests.json

@ -38,6 +38,9 @@ disable=
# TODO(https://github.com/grpc/grpc/issues/261): This doesn't seem to
# work for now? Try with a later pylint?
locally-disabled,
# NOTE(nathaniel): What even is this? *Enabling* an inspection results
# in a warning? How does that encourage more analysis and coverage?
locally-enabled,
# NOTE(nathaniel): We don't write doc strings for most private code
# elements.
missing-docstring,

@ -15,11 +15,284 @@
import abc
from google.protobuf import descriptor
import six
import grpc
class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
"""Fixture for a unary-unary RPC invoked by a system under test.
Enables users to "play server" for the RPC.
"""
@abc.abstractmethod
def send_initial_metadata(self, initial_metadata):
"""Sends the RPC's initial metadata to the system under test.
Args:
initial_metadata: The RPC's initial metadata to be "sent" to
the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Blocks until the system under test has cancelled the RPC."""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self, response, trailing_metadata, code, details):
"""Terminates the RPC.
Args:
response: The response for the RPC.
trailing_metadata: The RPC's trailing metadata.
code: The RPC's status code.
details: The RPC's status details.
"""
raise NotImplementedError()
class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
"""Fixture for a unary-stream RPC invoked by a system under test.
Enables users to "play server" for the RPC.
"""
@abc.abstractmethod
def send_initial_metadata(self, initial_metadata):
"""Sends the RPC's initial metadata to the system under test.
Args:
initial_metadata: The RPC's initial metadata to be "sent" to
the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def send_response(self, response):
"""Sends a response to the system under test.
Args:
response: A response message to be "sent" to the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Blocks until the system under test has cancelled the RPC."""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self, trailing_metadata, code, details):
"""Terminates the RPC.
Args:
trailing_metadata: The RPC's trailing metadata.
code: The RPC's status code.
details: The RPC's status details.
"""
raise NotImplementedError()
class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
"""Fixture for a stream-unary RPC invoked by a system under test.
Enables users to "play server" for the RPC.
"""
@abc.abstractmethod
def send_initial_metadata(self, initial_metadata):
"""Sends the RPC's initial metadata to the system under test.
Args:
initial_metadata: The RPC's initial metadata to be "sent" to
the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def take_request(self):
"""Draws one of the requests added to the RPC by the system under test.
This method blocks until the system under test has added to the RPC
the request to be returned.
Successive calls to this method return requests in the same order in
which the system under test added them to the RPC.
Returns:
A request message added to the RPC by the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def requests_closed(self):
"""Blocks until the system under test has closed the request stream."""
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Blocks until the system under test has cancelled the RPC."""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self, response, trailing_metadata, code, details):
"""Terminates the RPC.
Args:
response: The response for the RPC.
trailing_metadata: The RPC's trailing metadata.
code: The RPC's status code.
details: The RPC's status details.
"""
raise NotImplementedError()
class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
"""Fixture for a stream-stream RPC invoked by a system under test.
Enables users to "play server" for the RPC.
"""
@abc.abstractmethod
def send_initial_metadata(self, initial_metadata):
"""Sends the RPC's initial metadata to the system under test.
Args:
initial_metadata: The RPC's initial metadata to be "sent" to the
system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def take_request(self):
"""Draws one of the requests added to the RPC by the system under test.
This method blocks until the system under test has added to the RPC
the request to be returned.
Successive calls to this method return requests in the same order in
which the system under test added them to the RPC.
Returns:
A request message added to the RPC by the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def send_response(self, response):
"""Sends a response to the system under test.
Args:
response: A response messages to be "sent" to the system under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def requests_closed(self):
"""Blocks until the system under test has closed the request stream."""
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Blocks until the system under test has cancelled the RPC."""
raise NotImplementedError()
@abc.abstractmethod
def terminate(self, trailing_metadata, code, details):
"""Terminates the RPC.
Args:
trailing_metadata: The RPC's trailing metadata.
code: The RPC's status code.
details: The RPC's status details.
"""
raise NotImplementedError()
class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel):
"""A grpc.Channel double with which to test a system that invokes RPCs."""
@abc.abstractmethod
def take_unary_unary(self, method_descriptor):
"""Draws an RPC currently being made by the system under test.
If the given descriptor does not identify any RPC currently being made
by the system under test, this method blocks until the system under
test invokes such an RPC.
Args:
method_descriptor: A descriptor.MethodDescriptor describing a
unary-unary RPC method.
Returns:
A (invocation_metadata, request, unary_unary_channel_rpc) tuple of
the RPC's invocation metadata, its request, and a
UnaryUnaryChannelRpc with which to "play server" for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def take_unary_stream(self, method_descriptor):
"""Draws an RPC currently being made by the system under test.
If the given descriptor does not identify any RPC currently being made
by the system under test, this method blocks until the system under
test invokes such an RPC.
Args:
method_descriptor: A descriptor.MethodDescriptor describing a
unary-stream RPC method.
Returns:
A (invocation_metadata, request, unary_stream_channel_rpc) tuple of
the RPC's invocation metadata, its request, and a
UnaryStreamChannelRpc with which to "play server" for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def take_stream_unary(self, method_descriptor):
"""Draws an RPC currently being made by the system under test.
If the given descriptor does not identify any RPC currently being made
by the system under test, this method blocks until the system under
test invokes such an RPC.
Args:
method_descriptor: A descriptor.MethodDescriptor describing a
stream-unary RPC method.
Returns:
A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's
invocation metadata and a StreamUnaryChannelRpc with which to "play
server" for the RPC.
"""
raise NotImplementedError()
@abc.abstractmethod
def take_stream_stream(self, method_descriptor):
"""Draws an RPC currently being made by the system under test.
If the given descriptor does not identify any RPC currently being made
by the system under test, this method blocks until the system under
test invokes such an RPC.
Args:
method_descriptor: A descriptor.MethodDescriptor describing a
stream-stream RPC method.
Returns:
A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's
invocation metadata and a StreamStreamChannelRpc with which to
"play server" for the RPC.
"""
raise NotImplementedError()
class Time(six.with_metaclass(abc.ABCMeta)):
"""A simulation of time.
@ -117,3 +390,19 @@ def strict_fake_time(now):
"""
from grpc_testing import _time
return _time.StrictFakeTime(now)
def channel(service_descriptors, time):
"""Creates a Channel for use in tests of a gRPC Python-using system.
Args:
service_descriptors: An iterable of descriptor.ServiceDescriptors
describing the RPCs that will be made on the returned Channel by the
system under test.
time: A Time to be used for tests.
Returns:
A Channel for use in tests.
"""
from grpc_testing import _channel
return _channel.testing_channel(service_descriptors, time)

@ -0,0 +1,23 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc_testing._channel import _channel
from grpc_testing._channel import _channel_state
# descriptors is reserved for later use.
# pylint: disable=unused-argument
def testing_channel(descriptors, time):
return _channel.TestingChannel(time, _channel_state.State())
# pylint: enable=unused-argument

@ -0,0 +1,62 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc_testing
from grpc_testing._channel import _channel_rpc
from grpc_testing._channel import _multi_callable
# All serializer and deserializer parameters are not (yet) used by this
# test infrastructure.
# pylint: disable=unused-argument
class TestingChannel(grpc_testing.Channel):
def __init__(self, time, state):
self._time = time
self._state = state
def subscribe(self, callback, try_to_connect=False):
raise NotImplementedError()
def unsubscribe(self, callback):
raise NotImplementedError()
def unary_unary(
self, method, request_serializer=None, response_deserializer=None):
return _multi_callable.UnaryUnary(method, self._state)
def unary_stream(
self, method, request_serializer=None, response_deserializer=None):
return _multi_callable.UnaryStream(method, self._state)
def stream_unary(
self, method, request_serializer=None, response_deserializer=None):
return _multi_callable.StreamUnary(method, self._state)
def stream_stream(
self, method, request_serializer=None, response_deserializer=None):
return _multi_callable.StreamStream(method, self._state)
def take_unary_unary(self, method_descriptor):
return _channel_rpc.unary_unary(self._state, method_descriptor)
def take_unary_stream(self, method_descriptor):
return _channel_rpc.unary_stream(self._state, method_descriptor)
def take_stream_unary(self, method_descriptor):
return _channel_rpc.stream_unary(self._state, method_descriptor)
def take_stream_stream(self, method_descriptor):
return _channel_rpc.stream_stream(self._state, method_descriptor)
# pylint: enable=unused-argument

@ -0,0 +1,119 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc_testing
class _UnaryUnary(grpc_testing.UnaryUnaryChannelRpc):
def __init__(self, rpc_state):
self._rpc_state = rpc_state
def send_initial_metadata(self, initial_metadata):
self._rpc_state.send_initial_metadata(initial_metadata)
def cancelled(self):
self._rpc_state.cancelled()
def terminate(self, response, trailing_metadata, code, details):
self._rpc_state.terminate_with_response(
response, trailing_metadata, code, details)
class _UnaryStream(grpc_testing.UnaryStreamChannelRpc):
def __init__(self, rpc_state):
self._rpc_state = rpc_state
def send_initial_metadata(self, initial_metadata):
self._rpc_state.send_initial_metadata(initial_metadata)
def send_response(self, response):
self._rpc_state.send_response(response)
def cancelled(self):
self._rpc_state.cancelled()
def terminate(self, trailing_metadata, code, details):
self._rpc_state.terminate(trailing_metadata, code, details)
class _StreamUnary(grpc_testing.StreamUnaryChannelRpc):
def __init__(self, rpc_state):
self._rpc_state = rpc_state
def send_initial_metadata(self, initial_metadata):
self._rpc_state.send_initial_metadata(initial_metadata)
def take_request(self):
return self._rpc_state.take_request()
def requests_closed(self):
return self._rpc_state.requests_closed()
def cancelled(self):
self._rpc_state.cancelled()
def terminate(self, response, trailing_metadata, code, details):
self._rpc_state.terminate_with_response(
response, trailing_metadata, code, details)
class _StreamStream(grpc_testing.StreamStreamChannelRpc):
def __init__(self, rpc_state):
self._rpc_state = rpc_state
def send_initial_metadata(self, initial_metadata):
self._rpc_state.send_initial_metadata(initial_metadata)
def take_request(self):
return self._rpc_state.take_request()
def send_response(self, response):
self._rpc_state.send_response(response)
def requests_closed(self):
return self._rpc_state.requests_closed()
def cancelled(self):
self._rpc_state.cancelled()
def terminate(self, trailing_metadata, code, details):
self._rpc_state.terminate(trailing_metadata, code, details)
def unary_unary(channel_state, method_descriptor):
rpc_state = channel_state.take_rpc_state(method_descriptor)
invocation_metadata, request = (
rpc_state.take_invocation_metadata_and_request())
return invocation_metadata, request, _UnaryUnary(rpc_state)
def unary_stream(channel_state, method_descriptor):
rpc_state = channel_state.take_rpc_state(method_descriptor)
invocation_metadata, request = (
rpc_state.take_invocation_metadata_and_request())
return invocation_metadata, request, _UnaryStream(rpc_state)
def stream_unary(channel_state, method_descriptor):
rpc_state = channel_state.take_rpc_state(method_descriptor)
return rpc_state.take_invocation_metadata(), _StreamUnary(rpc_state)
def stream_stream(channel_state, method_descriptor):
rpc_state = channel_state.take_rpc_state(method_descriptor)
return rpc_state.take_invocation_metadata(), _StreamStream(rpc_state)

@ -0,0 +1,48 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import threading
from grpc_testing import _common
from grpc_testing._channel import _rpc_state
class State(_common.ChannelHandler):
def __init__(self):
self._condition = threading.Condition()
self._rpc_states = collections.defaultdict(list)
def invoke_rpc(
self, method_full_rpc_name, invocation_metadata, requests,
requests_closed, timeout):
rpc_state = _rpc_state.State(
invocation_metadata, requests, requests_closed)
with self._condition:
self._rpc_states[method_full_rpc_name].append(rpc_state)
self._condition.notify_all()
return rpc_state
def take_rpc_state(self, method_descriptor):
method_full_rpc_name = '/{}/{}'.format(
method_descriptor.containing_service.full_name,
method_descriptor.name)
with self._condition:
while True:
method_rpc_states = self._rpc_states[method_full_rpc_name]
if method_rpc_states:
return method_rpc_states.pop(0)
else:
self._condition.wait()

@ -0,0 +1,322 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
import grpc
_NOT_YET_OBSERVED = object()
def _cancel(handler):
return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
def _is_active(handler):
return handler.is_active()
def _time_remaining(unused_handler):
raise NotImplementedError()
def _add_callback(handler, callback):
return handler.add_callback(callback)
def _initial_metadata(handler):
return handler.initial_metadata()
def _trailing_metadata(handler):
trailing_metadata, unused_code, unused_details = handler.termination()
return trailing_metadata
def _code(handler):
unused_trailing_metadata, code, unused_details = handler.termination()
return code
def _details(handler):
unused_trailing_metadata, unused_code, details = handler.termination()
return details
class _Call(grpc.Call):
def __init__(self, handler):
self._handler = handler
def cancel(self):
_cancel(self._handler)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)
class _RpcErrorCall(grpc.RpcError, grpc.Call):
def __init__(self, handler):
self._handler = handler
def cancel(self):
_cancel(self._handler)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)
def _next(handler):
read = handler.take_response()
if read.code is None:
return read.response
elif read.code is grpc.StatusCode.OK:
raise StopIteration()
else:
raise _RpcErrorCall(handler)
class _HandlerExtras(object):
def __init__(self):
self.condition = threading.Condition()
self.unary_response = _NOT_YET_OBSERVED
self.cancelled = False
def _with_extras_cancel(handler, extras):
with extras.condition:
if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
extras.cancelled = True
return True
else:
return False
def _extras_without_cancelled(extras):
with extras.condition:
return extras.cancelled
def _running(handler):
return handler.is_active()
def _done(handler):
return not handler.is_active()
def _with_extras_unary_response(handler, extras):
with extras.condition:
if extras.unary_response is _NOT_YET_OBSERVED:
read = handler.take_response()
if read.code is None:
extras.unary_response = read.response
return read.response
else:
raise _RpcErrorCall(handler)
else:
return extras.unary_response
def _exception(unused_handler):
raise NotImplementedError('TODO!')
def _traceback(unused_handler):
raise NotImplementedError('TODO!')
def _add_done_callback(handler, callback, future):
adapted_callback = lambda: callback(future)
if not handler.add_callback(adapted_callback):
callback(future)
class _FutureCall(grpc.Future, grpc.Call):
def __init__(self, handler, extras):
self._handler = handler
self._extras = extras
def cancel(self):
return _with_extras_cancel(self._handler, self._extras)
def cancelled(self):
return _extras_without_cancelled(self._extras)
def running(self):
return _running(self._handler)
def done(self):
return _done(self._handler)
def result(self):
return _with_extras_unary_response(self._handler, self._extras)
def exception(self):
return _exception(self._handler)
def traceback(self):
return _traceback(self._handler)
def add_done_callback(self, fn):
_add_done_callback(self._handler, fn, self)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)
def consume_requests(request_iterator, handler):
def _consume():
while True:
try:
request = next(request_iterator)
added = handler.add_request(request)
if not added:
break
except StopIteration:
handler.close_requests()
break
except Exception: # pylint: disable=broad-except
details = 'Exception iterating requests!'
logging.exception(details)
handler.cancel(grpc.StatusCode.UNKNOWN, details)
consumption = threading.Thread(target=_consume)
consumption.start()
def blocking_unary_response(handler):
read = handler.take_response()
if read.code is None:
unused_trailing_metadata, code, unused_details = handler.termination()
if code is grpc.StatusCode.OK:
return read.response
else:
raise _RpcErrorCall(handler)
else:
raise _RpcErrorCall(handler)
def blocking_unary_response_with_call(handler):
read = handler.take_response()
if read.code is None:
unused_trailing_metadata, code, unused_details = handler.termination()
if code is grpc.StatusCode.OK:
return read.response, _Call(handler)
else:
raise _RpcErrorCall(handler)
else:
raise _RpcErrorCall(handler)
def future_call(handler):
return _FutureCall(handler, _HandlerExtras())
class ResponseIteratorCall(grpc.Call):
def __init__(self, handler):
self._handler = handler
def __iter__(self):
return self
def __next__(self):
return _next(self._handler)
def next(self):
return _next(self._handler)
def cancel(self):
_cancel(self._handler)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)

@ -0,0 +1,115 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
from grpc_testing import _common
from grpc_testing._channel import _invocation
# All per-call credentials parameters are unused by this test infrastructure.
# pylint: disable=unused-argument
class UnaryUnary(grpc.UnaryUnaryMultiCallable):
def __init__(self, method_full_rpc_name, channel_handler):
self._method_full_rpc_name = method_full_rpc_name
self._channel_handler = channel_handler
def __call__(self, request, timeout=None, metadata=None, credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name, _common.fuss_with_metadata(metadata),
[request], True, timeout)
return _invocation.blocking_unary_response(rpc_handler)
def with_call(self, request, timeout=None, metadata=None, credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name, _common.fuss_with_metadata(metadata),
[request], True, timeout)
return _invocation.blocking_unary_response_with_call(rpc_handler)
def future(self, request, timeout=None, metadata=None, credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name, _common.fuss_with_metadata(metadata),
[request], True, timeout)
return _invocation.future_call(rpc_handler)
class UnaryStream(grpc.StreamStreamMultiCallable):
def __init__(self, method_full_rpc_name, channel_handler):
self._method_full_rpc_name = method_full_rpc_name
self._channel_handler = channel_handler
def __call__(self, request, timeout=None, metadata=None, credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name,
_common.fuss_with_metadata(metadata), [request], True, timeout)
return _invocation.ResponseIteratorCall(rpc_handler)
class StreamUnary(grpc.StreamUnaryMultiCallable):
def __init__(self, method_full_rpc_name, channel_handler):
self._method_full_rpc_name = method_full_rpc_name
self._channel_handler = channel_handler
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name,
_common.fuss_with_metadata(metadata), [], False, timeout)
_invocation.consume_requests(request_iterator, rpc_handler)
return _invocation.blocking_unary_response(rpc_handler)
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name,
_common.fuss_with_metadata(metadata), [], False, timeout)
_invocation.consume_requests(request_iterator, rpc_handler)
return _invocation.blocking_unary_response_with_call(rpc_handler)
def future(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name,
_common.fuss_with_metadata(metadata), [], False, timeout)
_invocation.consume_requests(request_iterator, rpc_handler)
return _invocation.future_call(rpc_handler)
class StreamStream(grpc.StreamStreamMultiCallable):
def __init__(self, method_full_rpc_name, channel_handler):
self._method_full_rpc_name = method_full_rpc_name
self._channel_handler = channel_handler
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
rpc_handler = self._channel_handler.invoke_rpc(
self._method_full_rpc_name,
_common.fuss_with_metadata(metadata), [], False, timeout)
_invocation.consume_requests(request_iterator, rpc_handler)
return _invocation.ResponseIteratorCall(rpc_handler)
# pylint: enable=unused-argument

@ -0,0 +1,193 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import grpc
from grpc_testing import _common
class State(_common.ChannelRpcHandler):
def __init__(self, invocation_metadata, requests, requests_closed):
self._condition = threading.Condition()
self._invocation_metadata = invocation_metadata
self._requests = requests
self._requests_closed = requests_closed
self._initial_metadata = None
self._responses = []
self._trailing_metadata = None
self._code = None
self._details = None
def initial_metadata(self):
with self._condition:
while True:
if self._initial_metadata is None:
if self._code is None:
self._condition.wait()
else:
return _common.FUSSED_EMPTY_METADATA
else:
return self._initial_metadata
def add_request(self, request):
with self._condition:
if self._code is None and not self._requests_closed:
self._requests.append(request)
self._condition.notify_all()
return True
else:
return False
def close_requests(self):
with self._condition:
if self._code is None and not self._requests_closed:
self._requests_closed = True
self._condition.notify_all()
def take_response(self):
with self._condition:
while True:
if self._code is grpc.StatusCode.OK:
if self._responses:
response = self._responses.pop(0)
return _common.ChannelRpcRead(
response, None, None, None)
else:
return _common.ChannelRpcRead(
None, self._trailing_metadata,
grpc.StatusCode.OK, self._details)
elif self._code is None:
if self._responses:
response = self._responses.pop(0)
return _common.ChannelRpcRead(
response, None, None, None)
else:
self._condition.wait()
else:
return _common.ChannelRpcRead(
None, self._trailing_metadata, self._code,
self._details)
def termination(self):
with self._condition:
while True:
if self._code is None:
self._condition.wait()
else:
return self._trailing_metadata, self._code, self._details
def cancel(self, code, details):
with self._condition:
if self._code is None:
if self._initial_metadata is None:
self._initial_metadata = _common.FUSSED_EMPTY_METADATA
self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
self._code = code
self._details = details
self._condition.notify_all()
return True
else:
return False
def take_invocation_metadata(self):
with self._condition:
if self._invocation_metadata is None:
raise ValueError('Expected invocation metadata!')
else:
invocation_metadata = self._invocation_metadata
self._invocation_metadata = None
return invocation_metadata
def take_invocation_metadata_and_request(self):
with self._condition:
if self._invocation_metadata is None:
raise ValueError('Expected invocation metadata!')
elif not self._requests:
raise ValueError('Expected at least one request!')
else:
invocation_metadata = self._invocation_metadata
self._invocation_metadata = None
return invocation_metadata, self._requests.pop(0)
def send_initial_metadata(self, initial_metadata):
with self._condition:
self._initial_metadata = _common.fuss_with_metadata(
initial_metadata)
self._condition.notify_all()
def take_request(self):
with self._condition:
while True:
if self._requests:
return self._requests.pop(0)
else:
self._condition.wait()
def requests_closed(self):
with self._condition:
while True:
if self._requests_closed:
return
else:
self._condition.wait()
def send_response(self, response):
with self._condition:
if self._code is None:
self._responses.append(response)
self._condition.notify_all()
def terminate_with_response(
self, response, trailing_metadata, code, details):
with self._condition:
if self._initial_metadata is None:
self._initial_metadata = _common.FUSSED_EMPTY_METADATA
self._responses.append(response)
self._trailing_metadata = _common.fuss_with_metadata(
trailing_metadata)
self._code = code
self._details = details
self._condition.notify_all()
def terminate(self, trailing_metadata, code, details):
with self._condition:
if self._initial_metadata is None:
self._initial_metadata = _common.FUSSED_EMPTY_METADATA
self._trailing_metadata = _common.fuss_with_metadata(
trailing_metadata)
self._code = code
self._details = details
self._condition.notify_all()
def cancelled(self):
with self._condition:
while True:
if self._code is grpc.StatusCode.CANCELLED:
return
elif self._code is None:
self._condition.wait()
else:
raise ValueError(
'Status code unexpectedly {}!'.format(self._code))
def is_active(self):
raise NotImplementedError()
def time_remaining(self):
raise NotImplementedError()
def add_callback(self, callback):
raise NotImplementedError()

@ -0,0 +1,92 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common interfaces and implementation."""
import abc
import collections
import six
def _fuss(tuplified_metadata):
return tuplified_metadata + (
(
'grpc.metadata_added_by_runtime',
'gRPC is allowed to add metadata in transmission and does so.',
),
)
FUSSED_EMPTY_METADATA = _fuss(())
def fuss_with_metadata(metadata):
if metadata is None:
return FUSSED_EMPTY_METADATA
else:
return _fuss(tuple(metadata))
class ChannelRpcRead(
collections.namedtuple(
'ChannelRpcRead',
('response', 'trailing_metadata', 'code', 'details',))):
pass
class ChannelRpcHandler(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
def initial_metadata(self):
raise NotImplementedError()
@abc.abstractmethod
def add_request(self, request):
raise NotImplementedError()
@abc.abstractmethod
def close_requests(self):
raise NotImplementedError()
@abc.abstractmethod
def take_response(self):
raise NotImplementedError()
@abc.abstractmethod
def cancel(self, code, details):
raise NotImplementedError()
@abc.abstractmethod
def termination(self):
raise NotImplementedError()
@abc.abstractmethod
def is_active(self):
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
raise NotImplementedError()
@abc.abstractmethod
def add_callback(self, callback):
raise NotImplementedError()
class ChannelHandler(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
def invoke_rpc(
self, method_full_rpc_name, invocation_metadata, requests,
requests_closed, timeout):
raise NotImplementedError()

@ -68,6 +68,10 @@ PACKAGE_DATA = {
'tests.protoc_plugin.protos.invocation_testing.split_services': [
'services.proto',
],
'tests.testing.proto': [
'requests.proto',
'services.proto',
],
'tests.unit': [
'credentials/ca.pem',
'credentials/server1.key',

@ -0,0 +1,36 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example gRPC Python-using application's common code elements."""
from tests.testing.proto import requests_pb2
from tests.testing.proto import services_pb2
SERVICE_NAME = 'tests_of_grpc_testing.FirstService'
UNARY_UNARY_METHOD_NAME = 'UnUn'
UNARY_STREAM_METHOD_NAME = 'UnStre'
STREAM_UNARY_METHOD_NAME = 'StreUn'
STREAM_STREAM_METHOD_NAME = 'StreStre'
UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=2)
ERRONEOUS_UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=3)
UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=5)
ERRONEOUS_UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=7)
UNARY_STREAM_REQUEST = requests_pb2.Charm(first_charm_field=11)
STREAM_UNARY_REQUEST = requests_pb2.Charm(first_charm_field=13)
STREAM_UNARY_RESPONSE = services_pb2.Strange(first_strange_field=17)
STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19)
STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23)
TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2
INFINITE_REQUEST_STREAM_TIMEOUT = 0.2

@ -0,0 +1,33 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc_testing
from tests.testing.proto import requests_pb2
from tests.testing.proto import services_pb2
# TODO(https://github.com/grpc/grpc/issues/11657): Eliminate this entirely.
# TODO(https://github.com/google/protobuf/issues/3452): Eliminate this if/else.
if services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None:
FIRST_SERVICE = 'Fix protobuf issue 3452!'
FIRST_SERVICE_UNUN = 'Fix protobuf issue 3452!'
FIRST_SERVICE_UNSTRE = 'Fix protobuf issue 3452!'
FIRST_SERVICE_STREUN = 'Fix protobuf issue 3452!'
FIRST_SERVICE_STRESTRE = 'Fix protobuf issue 3452!'
else:
FIRST_SERVICE = services_pb2.DESCRIPTOR.services_by_name['FirstService']
FIRST_SERVICE_UNUN = FIRST_SERVICE.methods_by_name['UnUn']
FIRST_SERVICE_UNSTRE = FIRST_SERVICE.methods_by_name['UnStre']
FIRST_SERVICE_STREUN = FIRST_SERVICE.methods_by_name['StreUn']
FIRST_SERVICE_STRESTRE = FIRST_SERVICE.methods_by_name['StreStre']

@ -0,0 +1,260 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example gRPC Python-using client-side application."""
import collections
import enum
import threading
import time
import grpc
from tests.unit.framework.common import test_constants
from tests.testing.proto import requests_pb2
from tests.testing.proto import services_pb2
from tests.testing.proto import services_pb2_grpc
from tests.testing import _application_common
@enum.unique
class Scenario(enum.Enum):
UNARY_UNARY = 'unary unary'
UNARY_STREAM = 'unary stream'
STREAM_UNARY = 'stream unary'
STREAM_STREAM = 'stream stream'
CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
CANCEL_UNARY_UNARY = 'cancel unary unary'
CANCEL_UNARY_STREAM = 'cancel unary stream'
INFINITE_REQUEST_STREAM = 'infinite request stream'
class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
"""Outcome of a client application scenario.
Attributes:
kind: A Kind value describing the overall kind of scenario execution.
code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
details: A status details string. Only valid if kind is Kind.RPC_ERROR.
"""
@enum.unique
class Kind(enum.Enum):
SATISFACTORY = 'satisfactory'
UNSATISFACTORY = 'unsatisfactory'
RPC_ERROR = 'rpc error'
_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
class _Pipe(object):
def __init__(self):
self._condition = threading.Condition()
self._values = []
self._open = True
def __iter__(self):
return self
def _next(self):
with self._condition:
while True:
if self._values:
return self._values.pop(0)
elif not self._open:
raise StopIteration()
else:
self._condition.wait()
def __next__(self): # (Python 3 Iterator Protocol)
return self._next()
def next(self): # (Python 2 Iterator Protocol)
return self._next()
def add(self, value):
with self._condition:
self._values.append(value)
self._condition.notify_all()
def close(self):
with self._condition:
self._open = False
self._condition.notify_all()
def _run_unary_unary(stub):
response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
if _application_common.UNARY_UNARY_RESPONSE == response:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_unary_stream(stub):
response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
try:
next(response_iterator)
except StopIteration:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_stream_unary(stub):
response, call = stub.StreUn.with_call(
iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
if (_application_common.STREAM_UNARY_RESPONSE == response and
call.code() is grpc.StatusCode.OK):
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_stream_stream(stub):
request_pipe = _Pipe()
response_iterator = stub.StreStre(iter(request_pipe))
request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
first_responses = next(response_iterator), next(response_iterator),
request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
second_responses = next(response_iterator), next(response_iterator),
request_pipe.close()
try:
next(response_iterator)
except StopIteration:
unexpected_extra_response = False
else:
unexpected_extra_response = True
if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
and not unexpected_extra_response):
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_concurrent_stream_unary(stub):
future_calls = tuple(
stub.StreUn.future(
iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
for _ in range(test_constants.THREAD_CONCURRENCY))
for future_call in future_calls:
if future_call.code() is grpc.StatusCode.OK:
response = future_call.result()
if _application_common.STREAM_UNARY_RESPONSE != response:
return _UNSATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
else:
return _SATISFACTORY_OUTCOME
def _run_concurrent_stream_stream(stub):
condition = threading.Condition()
outcomes = [None] * test_constants.RPC_CONCURRENCY
def run_stream_stream(index):
outcome = _run_stream_stream(stub)
with condition:
outcomes[index] = outcome
condition.notify()
for index in range(test_constants.RPC_CONCURRENCY):
thread = threading.Thread(target=run_stream_stream, args=(index,))
thread.start()
with condition:
while True:
if all(outcomes):
for outcome in outcomes:
if outcome.kind is not Outcome.Kind.SATISFACTORY:
return _UNSATISFACTORY_OUTCOME
else:
return _SATISFACTORY_OUTCOME
else:
condition.wait()
def _run_cancel_unary_unary(stub):
response_future_call = stub.UnUn.future(
_application_common.UNARY_UNARY_REQUEST)
initial_metadata = response_future_call.initial_metadata()
cancelled = response_future_call.cancel()
if initial_metadata is not None and cancelled:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_infinite_request_stream(stub):
def infinite_request_iterator():
while True:
yield _application_common.STREAM_UNARY_REQUEST
response_future_call = stub.StreUn.future(
infinite_request_iterator(),
timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def run(scenario, channel):
stub = services_pb2_grpc.FirstServiceStub(channel)
try:
if scenario is Scenario.UNARY_UNARY:
return _run_unary_unary(stub)
elif scenario is Scenario.UNARY_STREAM:
return _run_unary_stream(stub)
elif scenario is Scenario.STREAM_UNARY:
return _run_stream_unary(stub)
elif scenario is Scenario.STREAM_STREAM:
return _run_stream_stream(stub)
elif scenario is Scenario.CONCURRENT_STREAM_UNARY:
return _run_concurrent_stream_unary(stub)
elif scenario is Scenario.CONCURRENT_STREAM_STREAM:
return _run_concurrent_stream_stream(stub)
elif scenario is Scenario.CANCEL_UNARY_UNARY:
return _run_cancel_unary_unary(stub)
elif scenario is Scenario.INFINITE_REQUEST_STREAM:
return _run_infinite_request_stream(stub)
except grpc.RpcError as rpc_error:
return Outcome(Outcome.Kind.RPC_ERROR,
rpc_error.code(), rpc_error.details())
_IMPLEMENTATIONS = {
Scenario.UNARY_UNARY: _run_unary_unary,
Scenario.UNARY_STREAM: _run_unary_stream,
Scenario.STREAM_UNARY: _run_stream_unary,
Scenario.STREAM_STREAM: _run_stream_stream,
Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
}
def run(scenario, channel):
stub = services_pb2_grpc.FirstServiceStub(channel)
try:
return _IMPLEMENTATIONS[scenario](stub)
except grpc.RpcError as rpc_error:
return Outcome(Outcome.Kind.RPC_ERROR,
rpc_error.code(), rpc_error.details())

@ -0,0 +1,306 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
import time
import unittest
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit.framework.common import test_constants
import grpc_testing
from tests.testing import _application_common
from tests.testing import _application_testing_common
from tests.testing import _client_application
from tests.testing.proto import requests_pb2
from tests.testing.proto import services_pb2
# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
@unittest.skipIf(
services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
'Fix protobuf issue 3452!')
class ClientTest(unittest.TestCase):
def setUp(self):
# In this test the client-side application under test executes in
# a separate thread while we retain use of the test thread to "play
# server".
self._client_execution_thread_pool = logging_pool.pool(1)
self._fake_time = grpc_testing.strict_fake_time(time.time())
self._real_time = grpc_testing.strict_real_time()
self._fake_time_channel = grpc_testing.channel(
services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time)
self._real_time_channel = grpc_testing.channel(
services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time)
def tearDown(self):
self._client_execution_thread_pool.shutdown(wait=True)
def test_successful_unary_unary(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.UNARY_UNARY,
self._real_time_channel)
invocation_metadata, request, rpc = (
self._real_time_channel.take_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN))
rpc.send_initial_metadata(())
rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (),
grpc.StatusCode.OK, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
def test_successful_unary_stream(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.UNARY_STREAM,
self._fake_time_channel)
invocation_metadata, request, rpc = (
self._fake_time_channel.take_unary_stream(
_application_testing_common.FIRST_SERVICE_UNSTRE))
rpc.send_initial_metadata(())
rpc.terminate((), grpc.StatusCode.OK, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
def test_successful_stream_unary(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.STREAM_UNARY,
self._real_time_channel)
invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
_application_testing_common.FIRST_SERVICE_STREUN)
rpc.send_initial_metadata(())
first_request = rpc.take_request()
second_request = rpc.take_request()
third_request = rpc.take_request()
rpc.requests_closed()
rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
grpc.StatusCode.OK, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
first_request)
self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
second_request)
self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
third_request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
def test_successful_stream_stream(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.STREAM_STREAM,
self._fake_time_channel)
invocation_metadata, rpc = self._fake_time_channel.take_stream_stream(
_application_testing_common.FIRST_SERVICE_STRESTRE)
first_request = rpc.take_request()
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
second_request = rpc.take_request()
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.requests_closed()
rpc.terminate((), grpc.StatusCode.OK, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
first_request)
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
second_request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
def test_concurrent_stream_stream(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run,
_client_application.Scenario.CONCURRENT_STREAM_STREAM,
self._real_time_channel)
rpcs = []
for _ in range(test_constants.RPC_CONCURRENCY):
invocation_metadata, rpc = (
self._real_time_channel.take_stream_stream(
_application_testing_common.FIRST_SERVICE_STRESTRE))
rpcs.append(rpc)
requests = {}
for rpc in rpcs:
requests[rpc] = [rpc.take_request()]
for rpc in rpcs:
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
for rpc in rpcs:
requests[rpc].append(rpc.take_request())
for rpc in rpcs:
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
for rpc in rpcs:
rpc.requests_closed()
for rpc in rpcs:
rpc.terminate((), grpc.StatusCode.OK, '')
application_return_value = application_future.result()
for requests_of_one_rpc in requests.values():
for request in requests_of_one_rpc:
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
def test_cancelled_unary_unary(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run,
_client_application.Scenario.CANCEL_UNARY_UNARY,
self._fake_time_channel)
invocation_metadata, request, rpc = (
self._fake_time_channel.take_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN))
rpc.send_initial_metadata(())
rpc.cancelled()
application_return_value = application_future.result()
self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
def test_status_stream_unary(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run,
_client_application.Scenario.CONCURRENT_STREAM_UNARY,
self._fake_time_channel)
rpcs = tuple(
self._fake_time_channel.take_stream_unary(
_application_testing_common.FIRST_SERVICE_STREUN)[1]
for _ in range(test_constants.THREAD_CONCURRENCY))
for rpc in rpcs:
rpc.take_request()
rpc.take_request()
rpc.take_request()
rpc.requests_closed()
rpc.send_initial_metadata((
('my_metadata_key', 'My Metadata Value!',),))
for rpc in rpcs[:-1]:
rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
grpc.StatusCode.OK, '')
rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (),
grpc.StatusCode.RESOURCE_EXHAUSTED,
'nope; not able to handle all those RPCs!')
application_return_value = application_future.result()
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.UNSATISFACTORY)
def test_status_stream_stream(self):
code = grpc.StatusCode.DEADLINE_EXCEEDED
details = 'test deadline exceeded!'
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.STREAM_STREAM,
self._real_time_channel)
invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
_application_testing_common.FIRST_SERVICE_STRESTRE)
first_request = rpc.take_request()
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
second_request = rpc.take_request()
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.requests_closed()
rpc.terminate((), code, details)
application_return_value = application_future.result()
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
first_request)
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
second_request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.RPC_ERROR)
self.assertIs(application_return_value.code, code)
self.assertEqual(application_return_value.details, details)
def test_misbehaving_server_unary_unary(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.UNARY_UNARY,
self._fake_time_channel)
invocation_metadata, request, rpc = (
self._fake_time_channel.take_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN))
rpc.send_initial_metadata(())
rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (),
grpc.StatusCode.OK, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.UNSATISFACTORY)
def test_misbehaving_server_stream_stream(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run, _client_application.Scenario.STREAM_STREAM,
self._real_time_channel)
invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
_application_testing_common.FIRST_SERVICE_STRESTRE)
first_request = rpc.take_request()
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
second_request = rpc.take_request()
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
rpc.requests_closed()
rpc.terminate((), grpc.StatusCode.OK, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
first_request)
self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
second_request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.UNSATISFACTORY)
def test_infinite_request_stream_real_time(self):
application_future = self._client_execution_thread_pool.submit(
_client_application.run,
_client_application.Scenario.INFINITE_REQUEST_STREAM,
self._real_time_channel)
invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
_application_testing_common.FIRST_SERVICE_STREUN)
rpc.send_initial_metadata(())
first_request = rpc.take_request()
second_request = rpc.take_request()
third_request = rpc.take_request()
self._real_time.sleep_for(
_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
grpc.StatusCode.DEADLINE_EXCEEDED, '')
application_return_value = application_future.result()
self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
first_request)
self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
second_request)
self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
third_request)
self.assertIs(application_return_value.kind,
_client_application.Outcome.Kind.SATISFACTORY)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,13 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,29 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package tests_of_grpc_testing;
message Up {
int32 first_up_field = 1;
}
message Charm {
int32 first_charm_field = 1;
}
message Top {
int32 first_top_field = 1;
}

@ -0,0 +1,42 @@
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
import "tests/testing/proto/requests.proto";
package tests_of_grpc_testing;
message Down {
int32 first_down_field = 1;
}
message Strange {
int32 first_strange_field = 1;
}
message Bottom {
int32 first_bottom_field = 1;
}
service FirstService {
rpc UnUn(Up) returns (Down);
rpc UnStre(Charm) returns (stream Strange);
rpc StreUn(stream Charm) returns (Strange);
rpc StreStre(stream Top) returns (stream Bottom);
}
service SecondService {
rpc UnStre(Strange) returns (stream Charm);
}

@ -9,6 +9,7 @@
"protoc_plugin._split_definitions_test.SplitSeparateTest",
"protoc_plugin.beta_python_plugin_test.PythonPluginTest",
"reflection._reflection_servicer_test.ReflectionServicerTest",
"testing._client_test.ClientTest",
"testing._time_test.StrictFakeTimeTest",
"testing._time_test.StrictRealTimeTest",
"unit._api_test.AllTest",

Loading…
Cancel
Save