From e329de88e7ba0deacc41e2c7379223029ac652a9 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jun 2020 12:22:59 -0700 Subject: [PATCH 1/9] Add failing test --- src/python/grpcio/grpc/_simple_stubs.py | 2 +- .../grpcio/grpc/experimental/__init__.py | 5 +- .../tests_py3_only/unit/_simple_stubs_test.py | 61 ++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index 2ee82ba8642..f17dc0abe03 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -53,7 +53,7 @@ else: def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], compression: Optional[grpc.Compression]) -> grpc.Channel: - if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials: + if channel_credentials is grpc.experimental.insecure_channel_credentials(): _LOGGER.debug(f"Creating insecure channel with options '{options}' " + f"and compression '{compression}'") return grpc.insecure_channel(target, diff --git a/src/python/grpcio/grpc/experimental/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py index eb642900d9c..9b93410df23 100644 --- a/src/python/grpcio/grpc/experimental/__init__.py +++ b/src/python/grpcio/grpc/experimental/__init__.py @@ -41,7 +41,8 @@ class UsageError(Exception): """Raised by the gRPC library to indicate usage not allowed by the API.""" -_insecure_channel_credentials = object() +_insecure_channel_credentials_sentinel = object() +_insecure_channel_credentials = grpc.ChannelCredentials(_insecure_channel_credentials_sentinel) def insecure_channel_credentials(): @@ -53,7 +54,7 @@ def insecure_channel_credentials(): used with grpc.unary_unary, grpc.unary_stream, grpc.stream_unary, or grpc.stream_stream. """ - return grpc.ChannelCredentials(_insecure_channel_credentials) + return _insecure_channel_credentials class ExperimentalApiWarning(Warning): diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index cb1ca3fabf8..c1863aa8a96 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -19,19 +19,21 @@ import os _MAXIMUM_CHANNELS = 10 -os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "1" +os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "2" os.environ["GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] = str(_MAXIMUM_CHANNELS) import contextlib import datetime import inspect import logging +import threading import unittest import sys import time from typing import Callable, Optional from tests.unit import test_common +from tests.unit.framework.common import get_socket from tests.unit import resources import grpc import grpc.experimental @@ -311,6 +313,63 @@ class SimpleStubsTest(unittest.TestCase): insecure=True, channel_credentials=grpc.local_channel_credentials()) + def test_default_wait_for_ready(self): + addr, port, sock = get_socket() + sock.close() + target = f'{addr}:{port}' + channel = grpc._simple_stubs.ChannelCache.get().get_channel(target, + (), + None, + True, + None) + rpc_finished_event = threading.Event() + rpc_failed_event = threading.Event() + server = None + + def _on_connectivity_changed(connectivity): + nonlocal server + if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE: + self.assertFalse(rpc_finished_event.is_set()) + self.assertFalse(rpc_failed_event.is_set()) + server = test_common.test_server() + server.add_insecure_port(target) + server.add_generic_rpc_handlers((_GenericHandler(),)) + server.start() + channel.unsubscribe(_on_connectivity_changed) + elif connectivity in (grpc.ChannelConnectivity.IDLE, grpc.ChannelConnectivity.CONNECTING): + pass + else: + raise AssertionError("Encountered unknown state.") + + channel.subscribe(_on_connectivity_changed) + + def _send_rpc(): + try: + response = grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + # wait_for_ready=True, # remove + # timeout=30.0, + insecure=True) + rpc_finished_event.set() + except Exception as e: + import sys; sys.stderr.write(e); sys.stderr.flush() + rpc_failed_event.set() + + t = threading.Thread(target=_send_rpc) + t.start() + t.join() + self.assertFalse(rpc_failed_event.is_set()) + self.assertTrue(rpc_finished_event.is_set()) + if server is not None: + server.stop(None) + + + def test_wait_for_ready_default_set(self): + # TODO: Implement. + pass + if __name__ == "__main__": logging.basicConfig(level=logging.INFO) From 50cab884e1b9a82c6848a5d1f1817fa1aba3d2df Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jun 2020 12:44:37 -0700 Subject: [PATCH 2/9] Default wait_for_ready to True --- src/python/grpcio/grpc/_simple_stubs.py | 40 +++++++++++++++---- .../tests_py3_only/unit/_simple_stubs_test.py | 7 ---- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index f17dc0abe03..21e87c3b66a 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -49,6 +49,13 @@ if _MAXIMUM_CHANNELS_KEY in os.environ: else: _MAXIMUM_CHANNELS = 2**8 +_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" +if _DEFAULT_TIMEOUT_KEY in os.environ: + _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) + _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) +else: + _DEFAULT_TIMEOUT = 60.0 + def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], @@ -165,6 +172,15 @@ class ChannelCache: return len(self._mapping) +def _get_wait_for_ready_settings(wait_for_ready: Optional[bool], + timeout: Optional[float]) -> Tuple[bool, float]: + if wait_for_ready is None: + wait_for_ready = True + if wait_for_ready and timeout is None: + timeout = _DEFAULT_TIMEOUT + return wait_for_ready, timeout + + @experimental_api def unary_unary( request: RequestType, @@ -221,9 +237,10 @@ def unary_unary( immediately if the connection is not ready at the time the RPC is invoked, or if it should wait until the connection to the server becomes ready. When using this option, the user will likely also want - to set a timeout. Defaults to False. + to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. + after which an exception will be raised. If timeout is unspecified and + wait_for_ready is True, defaults to one minute. metadata: Optional metadata to send to the server. Returns: @@ -234,6 +251,7 @@ def unary_unary( compression) multicallable = channel.unary_unary(method, request_serializer, response_deserializer) + wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -296,9 +314,10 @@ def unary_stream( immediately if the connection is not ready at the time the RPC is invoked, or if it should wait until the connection to the server becomes ready. When using this option, the user will likely also want - to set a timeout. Defaults to False. + to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. + after which an exception will be raised. If timeout is unspecified and + wait_for_ready is True, defaults to one minute. metadata: Optional metadata to send to the server. Returns: @@ -309,6 +328,7 @@ def unary_stream( compression) multicallable = channel.unary_stream(method, request_serializer, response_deserializer) + wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -371,9 +391,10 @@ def stream_unary( immediately if the connection is not ready at the time the RPC is invoked, or if it should wait until the connection to the server becomes ready. When using this option, the user will likely also want - to set a timeout. Defaults to False. + to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. + after which an exception will be raised. If timeout is unspecified and + wait_for_ready is True, defaults to one minute. metadata: Optional metadata to send to the server. Returns: @@ -384,6 +405,7 @@ def stream_unary( compression) multicallable = channel.stream_unary(method, request_serializer, response_deserializer) + wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, @@ -446,9 +468,10 @@ def stream_stream( immediately if the connection is not ready at the time the RPC is invoked, or if it should wait until the connection to the server becomes ready. When using this option, the user will likely also want - to set a timeout. Defaults to False. + to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. + after which an exception will be raised. If timeout is unspecified and + wait_for_ready is True, defaults to one minute. metadata: Optional metadata to send to the server. Returns: @@ -459,6 +482,7 @@ def stream_stream( compression) multicallable = channel.stream_stream(method, request_serializer, response_deserializer) + wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index c1863aa8a96..a3c55386dbf 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -349,8 +349,6 @@ class SimpleStubsTest(unittest.TestCase): _REQUEST, target, _UNARY_UNARY, - # wait_for_ready=True, # remove - # timeout=30.0, insecure=True) rpc_finished_event.set() except Exception as e: @@ -366,11 +364,6 @@ class SimpleStubsTest(unittest.TestCase): server.stop(None) - def test_wait_for_ready_default_set(self): - # TODO: Implement. - pass - - if __name__ == "__main__": logging.basicConfig(level=logging.INFO) unittest.main(verbosity=2) From 25cdba8a706bf6f50a36e6ae073bb3ff71f418cd Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jun 2020 12:50:33 -0700 Subject: [PATCH 3/9] Yapf --- src/python/grpcio/grpc/_simple_stubs.py | 15 +++++++++----- .../grpcio/grpc/experimental/__init__.py | 3 ++- .../tests_py3_only/unit/_simple_stubs_test.py | 20 ++++++++----------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index 21e87c3b66a..5bbf2845bf4 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -173,7 +173,8 @@ class ChannelCache: def _get_wait_for_ready_settings(wait_for_ready: Optional[bool], - timeout: Optional[float]) -> Tuple[bool, float]: + timeout: Optional[float] + ) -> Tuple[bool, float]: if wait_for_ready is None: wait_for_ready = True if wait_for_ready and timeout is None: @@ -251,7 +252,8 @@ def unary_unary( compression) multicallable = channel.unary_unary(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) + wait_for_ready, timeout = _get_wait_for_ready_settings( + wait_for_ready, timeout) return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -328,7 +330,8 @@ def unary_stream( compression) multicallable = channel.unary_stream(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) + wait_for_ready, timeout = _get_wait_for_ready_settings( + wait_for_ready, timeout) return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -405,7 +408,8 @@ def stream_unary( compression) multicallable = channel.stream_unary(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) + wait_for_ready, timeout = _get_wait_for_ready_settings( + wait_for_ready, timeout) return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, @@ -482,7 +486,8 @@ def stream_stream( compression) multicallable = channel.stream_stream(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings(wait_for_ready, timeout) + wait_for_ready, timeout = _get_wait_for_ready_settings( + wait_for_ready, timeout) return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, diff --git a/src/python/grpcio/grpc/experimental/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py index 9b93410df23..1ca0e7005c4 100644 --- a/src/python/grpcio/grpc/experimental/__init__.py +++ b/src/python/grpcio/grpc/experimental/__init__.py @@ -42,7 +42,8 @@ class UsageError(Exception): _insecure_channel_credentials_sentinel = object() -_insecure_channel_credentials = grpc.ChannelCredentials(_insecure_channel_credentials_sentinel) +_insecure_channel_credentials = grpc.ChannelCredentials( + _insecure_channel_credentials_sentinel) def insecure_channel_credentials(): diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index a3c55386dbf..f7a80081fce 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -317,11 +317,8 @@ class SimpleStubsTest(unittest.TestCase): addr, port, sock = get_socket() sock.close() target = f'{addr}:{port}' - channel = grpc._simple_stubs.ChannelCache.get().get_channel(target, - (), - None, - True, - None) + channel = grpc._simple_stubs.ChannelCache.get().get_channel( + target, (), None, True, None) rpc_finished_event = threading.Event() rpc_failed_event = threading.Event() server = None @@ -336,7 +333,8 @@ class SimpleStubsTest(unittest.TestCase): server.add_generic_rpc_handlers((_GenericHandler(),)) server.start() channel.unsubscribe(_on_connectivity_changed) - elif connectivity in (grpc.ChannelConnectivity.IDLE, grpc.ChannelConnectivity.CONNECTING): + elif connectivity in (grpc.ChannelConnectivity.IDLE, + grpc.ChannelConnectivity.CONNECTING): pass else: raise AssertionError("Encountered unknown state.") @@ -345,14 +343,12 @@ class SimpleStubsTest(unittest.TestCase): def _send_rpc(): try: - response = grpc.experimental.unary_unary( - _REQUEST, - target, - _UNARY_UNARY, - insecure=True) + response = grpc.experimental.unary_unary(_REQUEST, + target, + _UNARY_UNARY, + insecure=True) rpc_finished_event.set() except Exception as e: - import sys; sys.stderr.write(e); sys.stderr.flush() rpc_failed_event.set() t = threading.Thread(target=_send_rpc) From 00d20bb70a6aaf64154659d7ace5d442a59d8064 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 9 Jun 2020 10:56:34 -0700 Subject: [PATCH 4/9] Use self.fail --- .../grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index f7a80081fce..dc6a047f641 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -337,7 +337,7 @@ class SimpleStubsTest(unittest.TestCase): grpc.ChannelConnectivity.CONNECTING): pass else: - raise AssertionError("Encountered unknown state.") + self.fail("Encountered unknown state.") channel.subscribe(_on_connectivity_changed) From 6344b2b97f409e3c3c37becc19e07ad6ad43fd60 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 10 Jun 2020 11:48:22 -0700 Subject: [PATCH 5/9] Support infinite timeouts --- src/python/grpcio/grpc/_simple_stubs.py | 48 +- .../tests_py3_only/unit/_simple_stubs_test.py | 426 ++++++++++-------- 2 files changed, 268 insertions(+), 206 deletions(-) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index 5bbf2845bf4..7265769f3ef 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -56,6 +56,11 @@ if _DEFAULT_TIMEOUT_KEY in os.environ: else: _DEFAULT_TIMEOUT = 60.0 +class _DefaultTimeoutSentinelType(object): + pass + +_DEFAULT_TIMEOUT_SENTINEL = _DefaultTimeoutSentinelType() + def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], @@ -173,11 +178,12 @@ class ChannelCache: def _get_wait_for_ready_settings(wait_for_ready: Optional[bool], - timeout: Optional[float] - ) -> Tuple[bool, float]: + timeout: Union[Optional[float], _DefaultTimeoutSentinelType] + ) -> Tuple[bool, Optional[float]]: if wait_for_ready is None: wait_for_ready = True - if wait_for_ready and timeout is None: + # TODO: You don't actually need this sentinel... + if timeout is _DEFAULT_TIMEOUT_SENTINEL: timeout = _DEFAULT_TIMEOUT return wait_for_ready, timeout @@ -195,7 +201,7 @@ def unary_unary( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = None, + timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> ResponseType: """Invokes a unary-unary RPC without an explicitly specified channel. @@ -240,8 +246,11 @@ def unary_unary( becomes ready. When using this option, the user will likely also want to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. If timeout is unspecified and - wait_for_ready is True, defaults to one minute. + after which an exception will be raised. If timeout is unspecified, + defaults to a timeout controlled by the + GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is + unset, defaults to 60 seconds. Supply a value of None to indicate that + no timeout should be enforced. metadata: Optional metadata to send to the server. Returns: @@ -274,7 +283,7 @@ def unary_stream( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = None, + timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> Iterator[ResponseType]: """Invokes a unary-stream RPC without an explicitly specified channel. @@ -318,8 +327,11 @@ def unary_stream( becomes ready. When using this option, the user will likely also want to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. If timeout is unspecified and - wait_for_ready is True, defaults to one minute. + after which an exception will be raised. If timeout is unspecified, + defaults to a timeout controlled by the + GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is + unset, defaults to 60 seconds. Supply a value of None to indicate that + no timeout should be enforced. metadata: Optional metadata to send to the server. Returns: @@ -352,7 +364,7 @@ def stream_unary( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = None, + timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> ResponseType: """Invokes a stream-unary RPC without an explicitly specified channel. @@ -396,8 +408,11 @@ def stream_unary( becomes ready. When using this option, the user will likely also want to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. If timeout is unspecified and - wait_for_ready is True, defaults to one minute. + after which an exception will be raised. If timeout is unspecified, + defaults to a timeout controlled by the + GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is + unset, defaults to 60 seconds. Supply a value of None to indicate that + no timeout should be enforced. metadata: Optional metadata to send to the server. Returns: @@ -430,7 +445,7 @@ def stream_stream( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = None, + timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> Iterator[ResponseType]: """Invokes a stream-stream RPC without an explicitly specified channel. @@ -474,8 +489,11 @@ def stream_stream( becomes ready. When using this option, the user will likely also want to set a timeout. Defaults to True. timeout: An optional duration of time in seconds to allow for the RPC, - after which an exception will be raised. If timeout is unspecified and - wait_for_ready is True, defaults to one minute. + after which an exception will be raised. If timeout is unspecified, + defaults to a timeout controlled by the + GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is + unset, defaults to 60 seconds. Supply a value of None to indicate that + no timeout should be enforced. metadata: Optional metadata to send to the server. Returns: diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index dc6a047f641..207760ad6e5 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -19,8 +19,11 @@ import os _MAXIMUM_CHANNELS = 10 +_DEFAULT_TIMEOUT = 1.0 + os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "2" os.environ["GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] = str(_MAXIMUM_CHANNELS) +os.environ["GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"] = str(_DEFAULT_TIMEOUT) import contextlib import datetime @@ -52,6 +55,7 @@ _UNARY_UNARY = "/test/UnaryUnary" _UNARY_STREAM = "/test/UnaryStream" _STREAM_UNARY = "/test/StreamUnary" _STREAM_STREAM = "/test/StreamStream" +_BLACK_HOLE = "/test/BlackHole" @contextlib.contextmanager @@ -82,6 +86,15 @@ def _stream_stream_handler(request_iterator, context): yield request +def _black_hole_handler(request, context): + event = threading.Event() + def _on_done(): + event.set() + context.add_callback(_on_done) + while not event.is_set(): + time.sleep(0.1) + + class _GenericHandler(grpc.GenericRpcHandler): def service(self, handler_call_details): @@ -93,6 +106,8 @@ class _GenericHandler(grpc.GenericRpcHandler): return grpc.stream_unary_rpc_method_handler(_stream_unary_handler) elif handler_call_details.method == _STREAM_STREAM: return grpc.stream_stream_rpc_method_handler(_stream_stream_handler) + elif handler_call_details.method == _BLACK_HOLE: + return grpc.unary_unary_rpc_method_handler(_black_hole_handler) else: raise NotImplementedError() @@ -163,201 +178,230 @@ class SimpleStubsTest(unittest.TestCase): else: self.fail(message() + " after " + str(timeout)) - def test_unary_unary_insecure(self): - with _server(None) as port: - target = f'localhost:{port}' - response = grpc.experimental.unary_unary( - _REQUEST, - target, - _UNARY_UNARY, - channel_credentials=grpc.experimental. - insecure_channel_credentials()) - self.assertEqual(_REQUEST, response) - - def test_unary_unary_secure(self): - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - response = grpc.experimental.unary_unary( - _REQUEST, - target, - _UNARY_UNARY, - channel_credentials=grpc.local_channel_credentials()) - self.assertEqual(_REQUEST, response) - - def test_channels_cached(self): - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - test_name = inspect.stack()[0][3] - args = (_REQUEST, target, _UNARY_UNARY) - kwargs = {"channel_credentials": grpc.local_channel_credentials()} - - def _invoke(seed: str): - run_kwargs = dict(kwargs) - run_kwargs["options"] = ((test_name + seed, ""),) - grpc.experimental.unary_unary(*args, **run_kwargs) - - self.assert_cached(_invoke) - - def test_channels_evicted(self): - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - response = grpc.experimental.unary_unary( - _REQUEST, - target, - _UNARY_UNARY, - channel_credentials=grpc.local_channel_credentials()) - self.assert_eventually( - lambda: grpc._simple_stubs.ChannelCache.get( - )._test_only_channel_count() == 0, - message=lambda: - f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain" - ) - - def test_total_channels_enforced(self): - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - for i in range(_STRESS_EPOCHS): - # Ensure we get a new channel each time. - options = (("foo", str(i)),) - # Send messages at full blast. - grpc.experimental.unary_unary( - _REQUEST, - target, - _UNARY_UNARY, - options=options, - channel_credentials=grpc.local_channel_credentials()) - self.assert_eventually( - lambda: grpc._simple_stubs.ChannelCache.get( - )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1, - message=lambda: - f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain" - ) - - def test_unary_stream(self): - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - for response in grpc.experimental.unary_stream( - _REQUEST, - target, - _UNARY_STREAM, - channel_credentials=grpc.local_channel_credentials()): - self.assertEqual(_REQUEST, response) - - def test_stream_unary(self): - - def request_iter(): - for _ in range(_CLIENT_REQUEST_COUNT): - yield _REQUEST - - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - response = grpc.experimental.stream_unary( - request_iter(), - target, - _STREAM_UNARY, - channel_credentials=grpc.local_channel_credentials()) - self.assertEqual(_REQUEST, response) - - def test_stream_stream(self): - - def request_iter(): - for _ in range(_CLIENT_REQUEST_COUNT): - yield _REQUEST - - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - for response in grpc.experimental.stream_stream( - request_iter(), - target, - _STREAM_STREAM, - channel_credentials=grpc.local_channel_credentials()): - self.assertEqual(_REQUEST, response) - - def test_default_ssl(self): - _private_key = resources.private_key() - _certificate_chain = resources.certificate_chain() - _server_certs = ((_private_key, _certificate_chain),) - _server_host_override = 'foo.test.google.fr' - _test_root_certificates = resources.test_root_certificates() - _property_options = (( - 'grpc.ssl_target_name_override', - _server_host_override, - ),) - cert_dir = os.path.join(os.path.dirname(resources.__file__), - "credentials") - cert_file = os.path.join(cert_dir, "ca.pem") - with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file): - server_creds = grpc.ssl_server_credentials(_server_certs) - with _server(server_creds) as port: - target = f'localhost:{port}' - response = grpc.experimental.unary_unary( - _REQUEST, target, _UNARY_UNARY, options=_property_options) - - def test_insecure_sugar(self): - with _server(None) as port: - target = f'localhost:{port}' - response = grpc.experimental.unary_unary(_REQUEST, - target, - _UNARY_UNARY, - insecure=True) - self.assertEqual(_REQUEST, response) - - def test_insecure_sugar_mutually_exclusive(self): + # def test_unary_unary_insecure(self): + # with _server(None) as port: + # target = f'localhost:{port}' + # response = grpc.experimental.unary_unary( + # _REQUEST, + # target, + # _UNARY_UNARY, + # channel_credentials=grpc.experimental. + # insecure_channel_credentials()) + # self.assertEqual(_REQUEST, response) + + # def test_unary_unary_secure(self): + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # response = grpc.experimental.unary_unary( + # _REQUEST, + # target, + # _UNARY_UNARY, + # channel_credentials=grpc.local_channel_credentials()) + # self.assertEqual(_REQUEST, response) + + # def test_channels_cached(self): + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # test_name = inspect.stack()[0][3] + # args = (_REQUEST, target, _UNARY_UNARY) + # kwargs = {"channel_credentials": grpc.local_channel_credentials()} + + # def _invoke(seed: str): + # run_kwargs = dict(kwargs) + # run_kwargs["options"] = ((test_name + seed, ""),) + # grpc.experimental.unary_unary(*args, **run_kwargs) + + # self.assert_cached(_invoke) + + # def test_channels_evicted(self): + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # response = grpc.experimental.unary_unary( + # _REQUEST, + # target, + # _UNARY_UNARY, + # channel_credentials=grpc.local_channel_credentials()) + # self.assert_eventually( + # lambda: grpc._simple_stubs.ChannelCache.get( + # )._test_only_channel_count() == 0, + # message=lambda: + # f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain" + # ) + + # def test_total_channels_enforced(self): + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # for i in range(_STRESS_EPOCHS): + # # Ensure we get a new channel each time. + # options = (("foo", str(i)),) + # # Send messages at full blast. + # grpc.experimental.unary_unary( + # _REQUEST, + # target, + # _UNARY_UNARY, + # options=options, + # channel_credentials=grpc.local_channel_credentials()) + # self.assert_eventually( + # lambda: grpc._simple_stubs.ChannelCache.get( + # )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1, + # message=lambda: + # f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain" + # ) + + # def test_unary_stream(self): + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # for response in grpc.experimental.unary_stream( + # _REQUEST, + # target, + # _UNARY_STREAM, + # channel_credentials=grpc.local_channel_credentials()): + # self.assertEqual(_REQUEST, response) + + # def test_stream_unary(self): + + # def request_iter(): + # for _ in range(_CLIENT_REQUEST_COUNT): + # yield _REQUEST + + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # response = grpc.experimental.stream_unary( + # request_iter(), + # target, + # _STREAM_UNARY, + # channel_credentials=grpc.local_channel_credentials()) + # self.assertEqual(_REQUEST, response) + + # def test_stream_stream(self): + + # def request_iter(): + # for _ in range(_CLIENT_REQUEST_COUNT): + # yield _REQUEST + + # with _server(grpc.local_server_credentials()) as port: + # target = f'localhost:{port}' + # for response in grpc.experimental.stream_stream( + # request_iter(), + # target, + # _STREAM_STREAM, + # channel_credentials=grpc.local_channel_credentials()): + # self.assertEqual(_REQUEST, response) + + # def test_default_ssl(self): + # _private_key = resources.private_key() + # _certificate_chain = resources.certificate_chain() + # _server_certs = ((_private_key, _certificate_chain),) + # _server_host_override = 'foo.test.google.fr' + # _test_root_certificates = resources.test_root_certificates() + # _property_options = (( + # 'grpc.ssl_target_name_override', + # _server_host_override, + # ),) + # cert_dir = os.path.join(os.path.dirname(resources.__file__), + # "credentials") + # cert_file = os.path.join(cert_dir, "ca.pem") + # with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file): + # server_creds = grpc.ssl_server_credentials(_server_certs) + # with _server(server_creds) as port: + # target = f'localhost:{port}' + # response = grpc.experimental.unary_unary( + # _REQUEST, target, _UNARY_UNARY, options=_property_options) + + # def test_insecure_sugar(self): + # with _server(None) as port: + # target = f'localhost:{port}' + # response = grpc.experimental.unary_unary(_REQUEST, + # target, + # _UNARY_UNARY, + # insecure=True) + # self.assertEqual(_REQUEST, response) + + # def test_insecure_sugar_mutually_exclusive(self): + # with _server(None) as port: + # target = f'localhost:{port}' + # with self.assertRaises(ValueError): + # response = grpc.experimental.unary_unary( + # _REQUEST, + # target, + # _UNARY_UNARY, + # insecure=True, + # channel_credentials=grpc.local_channel_credentials()) + + # def test_default_wait_for_ready(self): + # addr, port, sock = get_socket() + # sock.close() + # target = f'{addr}:{port}' + # channel = grpc._simple_stubs.ChannelCache.get().get_channel( + # target, (), None, True, None) + # rpc_finished_event = threading.Event() + # rpc_failed_event = threading.Event() + # server = None + + # def _on_connectivity_changed(connectivity): + # nonlocal server + # if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE: + # self.assertFalse(rpc_finished_event.is_set()) + # self.assertFalse(rpc_failed_event.is_set()) + # server = test_common.test_server() + # server.add_insecure_port(target) + # server.add_generic_rpc_handlers((_GenericHandler(),)) + # server.start() + # channel.unsubscribe(_on_connectivity_changed) + # elif connectivity in (grpc.ChannelConnectivity.IDLE, + # grpc.ChannelConnectivity.CONNECTING): + # pass + # else: + # self.fail("Encountered unknown state.") + + # channel.subscribe(_on_connectivity_changed) + + # def _send_rpc(): + # try: + # response = grpc.experimental.unary_unary(_REQUEST, + # target, + # _UNARY_UNARY, + # insecure=True) + # rpc_finished_event.set() + # except Exception as e: + # rpc_failed_event.set() + + # t = threading.Thread(target=_send_rpc) + # t.start() + # t.join() + # self.assertFalse(rpc_failed_event.is_set()) + # self.assertTrue(rpc_finished_event.is_set()) + # if server is not None: + # server.stop(None) + + def assert_times_out(self, invocation_args): with _server(None) as port: target = f'localhost:{port}' - with self.assertRaises(ValueError): - response = grpc.experimental.unary_unary( - _REQUEST, - target, - _UNARY_UNARY, - insecure=True, - channel_credentials=grpc.local_channel_credentials()) - - def test_default_wait_for_ready(self): - addr, port, sock = get_socket() - sock.close() - target = f'{addr}:{port}' - channel = grpc._simple_stubs.ChannelCache.get().get_channel( - target, (), None, True, None) - rpc_finished_event = threading.Event() - rpc_failed_event = threading.Event() - server = None - - def _on_connectivity_changed(connectivity): - nonlocal server - if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE: - self.assertFalse(rpc_finished_event.is_set()) - self.assertFalse(rpc_failed_event.is_set()) - server = test_common.test_server() - server.add_insecure_port(target) - server.add_generic_rpc_handlers((_GenericHandler(),)) - server.start() - channel.unsubscribe(_on_connectivity_changed) - elif connectivity in (grpc.ChannelConnectivity.IDLE, - grpc.ChannelConnectivity.CONNECTING): - pass - else: - self.fail("Encountered unknown state.") - - channel.subscribe(_on_connectivity_changed) - - def _send_rpc(): - try: + with self.assertRaises(grpc.RpcError) as cm: response = grpc.experimental.unary_unary(_REQUEST, target, - _UNARY_UNARY, - insecure=True) - rpc_finished_event.set() - except Exception as e: - rpc_failed_event.set() - - t = threading.Thread(target=_send_rpc) - t.start() - t.join() - self.assertFalse(rpc_failed_event.is_set()) - self.assertTrue(rpc_finished_event.is_set()) - if server is not None: - server.stop(None) + _BLACK_HOLE, + insecure=True, + **invocation_args) + self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, cm.exception.code()) + + def test_default_timeout(self): + not_present = object() + wait_for_ready_values = [True, not_present] + timeout_values = [0.5, not_present] + cases = [] + for wait_for_ready in wait_for_ready_values: + for timeout in timeout_values: + case = {} + if timeout is not not_present: + case["timeout"] = timeout + if wait_for_ready is not not_present: + case["wait_for_ready"] = wait_for_ready + cases.append(case) + + for case in cases: + with self.subTest(**case): + self.assert_times_out(case) if __name__ == "__main__": From 357c78c822336284370a0f7e9d0217c89a1b49e7 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 10 Jun 2020 11:54:26 -0700 Subject: [PATCH 6/9] Simplify implementation --- src/python/grpcio/grpc/_simple_stubs.py | 36 ++++++------------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index 7265769f3ef..baa7ae5dbe1 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -56,11 +56,6 @@ if _DEFAULT_TIMEOUT_KEY in os.environ: else: _DEFAULT_TIMEOUT = 60.0 -class _DefaultTimeoutSentinelType(object): - pass - -_DEFAULT_TIMEOUT_SENTINEL = _DefaultTimeoutSentinelType() - def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], @@ -177,17 +172,6 @@ class ChannelCache: return len(self._mapping) -def _get_wait_for_ready_settings(wait_for_ready: Optional[bool], - timeout: Union[Optional[float], _DefaultTimeoutSentinelType] - ) -> Tuple[bool, Optional[float]]: - if wait_for_ready is None: - wait_for_ready = True - # TODO: You don't actually need this sentinel... - if timeout is _DEFAULT_TIMEOUT_SENTINEL: - timeout = _DEFAULT_TIMEOUT - return wait_for_ready, timeout - - @experimental_api def unary_unary( request: RequestType, @@ -201,7 +185,7 @@ def unary_unary( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, + timeout: Optional[float] = _DEFAULT_TIMEOUT, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> ResponseType: """Invokes a unary-unary RPC without an explicitly specified channel. @@ -261,8 +245,7 @@ def unary_unary( compression) multicallable = channel.unary_unary(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings( - wait_for_ready, timeout) + wait_for_ready = wait_for_ready if wait_for_ready is not None else True return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -283,7 +266,7 @@ def unary_stream( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, + timeout: Optional[float] = _DEFAULT_TIMEOUT, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> Iterator[ResponseType]: """Invokes a unary-stream RPC without an explicitly specified channel. @@ -342,8 +325,7 @@ def unary_stream( compression) multicallable = channel.unary_stream(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings( - wait_for_ready, timeout) + wait_for_ready = wait_for_ready if wait_for_ready is not None else True return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -364,7 +346,7 @@ def stream_unary( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, + timeout: Optional[float] = _DEFAULT_TIMEOUT, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> ResponseType: """Invokes a stream-unary RPC without an explicitly specified channel. @@ -423,8 +405,7 @@ def stream_unary( compression) multicallable = channel.stream_unary(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings( - wait_for_ready, timeout) + wait_for_ready = wait_for_ready if wait_for_ready is not None else True return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, @@ -445,7 +426,7 @@ def stream_stream( call_credentials: Optional[grpc.CallCredentials] = None, compression: Optional[grpc.Compression] = None, wait_for_ready: Optional[bool] = None, - timeout: Optional[float] = _DEFAULT_TIMEOUT_SENTINEL, + timeout: Optional[float] = _DEFAULT_TIMEOUT, metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None ) -> Iterator[ResponseType]: """Invokes a stream-stream RPC without an explicitly specified channel. @@ -504,8 +485,7 @@ def stream_stream( compression) multicallable = channel.stream_stream(method, request_serializer, response_deserializer) - wait_for_ready, timeout = _get_wait_for_ready_settings( - wait_for_ready, timeout) + wait_for_ready = wait_for_ready if wait_for_ready is not None else True return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, From 5811df0fd08daec5b58df2fda7ffc77a3d59aacf Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 10 Jun 2020 12:00:18 -0700 Subject: [PATCH 7/9] Yapf --- .../grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index 207760ad6e5..4fa240f7f9b 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -88,8 +88,10 @@ def _stream_stream_handler(request_iterator, context): def _black_hole_handler(request, context): event = threading.Event() + def _on_done(): event.set() + context.add_callback(_on_done) while not event.is_set(): time.sleep(0.1) @@ -383,7 +385,8 @@ class SimpleStubsTest(unittest.TestCase): _BLACK_HOLE, insecure=True, **invocation_args) - self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, cm.exception.code()) + self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, + cm.exception.code()) def test_default_timeout(self): not_present = object() From 93477ae55fa3157b38b0fd5e5c1478fff671ea61 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 10 Jun 2020 12:06:09 -0700 Subject: [PATCH 8/9] That line changed count seemed a bit big --- .../tests_py3_only/unit/_simple_stubs_test.py | 390 +++++++++--------- 1 file changed, 195 insertions(+), 195 deletions(-) diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index 4fa240f7f9b..520f5a2f6bb 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -180,201 +180,201 @@ class SimpleStubsTest(unittest.TestCase): else: self.fail(message() + " after " + str(timeout)) - # def test_unary_unary_insecure(self): - # with _server(None) as port: - # target = f'localhost:{port}' - # response = grpc.experimental.unary_unary( - # _REQUEST, - # target, - # _UNARY_UNARY, - # channel_credentials=grpc.experimental. - # insecure_channel_credentials()) - # self.assertEqual(_REQUEST, response) - - # def test_unary_unary_secure(self): - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # response = grpc.experimental.unary_unary( - # _REQUEST, - # target, - # _UNARY_UNARY, - # channel_credentials=grpc.local_channel_credentials()) - # self.assertEqual(_REQUEST, response) - - # def test_channels_cached(self): - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # test_name = inspect.stack()[0][3] - # args = (_REQUEST, target, _UNARY_UNARY) - # kwargs = {"channel_credentials": grpc.local_channel_credentials()} - - # def _invoke(seed: str): - # run_kwargs = dict(kwargs) - # run_kwargs["options"] = ((test_name + seed, ""),) - # grpc.experimental.unary_unary(*args, **run_kwargs) - - # self.assert_cached(_invoke) - - # def test_channels_evicted(self): - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # response = grpc.experimental.unary_unary( - # _REQUEST, - # target, - # _UNARY_UNARY, - # channel_credentials=grpc.local_channel_credentials()) - # self.assert_eventually( - # lambda: grpc._simple_stubs.ChannelCache.get( - # )._test_only_channel_count() == 0, - # message=lambda: - # f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain" - # ) - - # def test_total_channels_enforced(self): - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # for i in range(_STRESS_EPOCHS): - # # Ensure we get a new channel each time. - # options = (("foo", str(i)),) - # # Send messages at full blast. - # grpc.experimental.unary_unary( - # _REQUEST, - # target, - # _UNARY_UNARY, - # options=options, - # channel_credentials=grpc.local_channel_credentials()) - # self.assert_eventually( - # lambda: grpc._simple_stubs.ChannelCache.get( - # )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1, - # message=lambda: - # f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain" - # ) - - # def test_unary_stream(self): - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # for response in grpc.experimental.unary_stream( - # _REQUEST, - # target, - # _UNARY_STREAM, - # channel_credentials=grpc.local_channel_credentials()): - # self.assertEqual(_REQUEST, response) - - # def test_stream_unary(self): - - # def request_iter(): - # for _ in range(_CLIENT_REQUEST_COUNT): - # yield _REQUEST - - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # response = grpc.experimental.stream_unary( - # request_iter(), - # target, - # _STREAM_UNARY, - # channel_credentials=grpc.local_channel_credentials()) - # self.assertEqual(_REQUEST, response) - - # def test_stream_stream(self): - - # def request_iter(): - # for _ in range(_CLIENT_REQUEST_COUNT): - # yield _REQUEST - - # with _server(grpc.local_server_credentials()) as port: - # target = f'localhost:{port}' - # for response in grpc.experimental.stream_stream( - # request_iter(), - # target, - # _STREAM_STREAM, - # channel_credentials=grpc.local_channel_credentials()): - # self.assertEqual(_REQUEST, response) - - # def test_default_ssl(self): - # _private_key = resources.private_key() - # _certificate_chain = resources.certificate_chain() - # _server_certs = ((_private_key, _certificate_chain),) - # _server_host_override = 'foo.test.google.fr' - # _test_root_certificates = resources.test_root_certificates() - # _property_options = (( - # 'grpc.ssl_target_name_override', - # _server_host_override, - # ),) - # cert_dir = os.path.join(os.path.dirname(resources.__file__), - # "credentials") - # cert_file = os.path.join(cert_dir, "ca.pem") - # with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file): - # server_creds = grpc.ssl_server_credentials(_server_certs) - # with _server(server_creds) as port: - # target = f'localhost:{port}' - # response = grpc.experimental.unary_unary( - # _REQUEST, target, _UNARY_UNARY, options=_property_options) - - # def test_insecure_sugar(self): - # with _server(None) as port: - # target = f'localhost:{port}' - # response = grpc.experimental.unary_unary(_REQUEST, - # target, - # _UNARY_UNARY, - # insecure=True) - # self.assertEqual(_REQUEST, response) - - # def test_insecure_sugar_mutually_exclusive(self): - # with _server(None) as port: - # target = f'localhost:{port}' - # with self.assertRaises(ValueError): - # response = grpc.experimental.unary_unary( - # _REQUEST, - # target, - # _UNARY_UNARY, - # insecure=True, - # channel_credentials=grpc.local_channel_credentials()) - - # def test_default_wait_for_ready(self): - # addr, port, sock = get_socket() - # sock.close() - # target = f'{addr}:{port}' - # channel = grpc._simple_stubs.ChannelCache.get().get_channel( - # target, (), None, True, None) - # rpc_finished_event = threading.Event() - # rpc_failed_event = threading.Event() - # server = None - - # def _on_connectivity_changed(connectivity): - # nonlocal server - # if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE: - # self.assertFalse(rpc_finished_event.is_set()) - # self.assertFalse(rpc_failed_event.is_set()) - # server = test_common.test_server() - # server.add_insecure_port(target) - # server.add_generic_rpc_handlers((_GenericHandler(),)) - # server.start() - # channel.unsubscribe(_on_connectivity_changed) - # elif connectivity in (grpc.ChannelConnectivity.IDLE, - # grpc.ChannelConnectivity.CONNECTING): - # pass - # else: - # self.fail("Encountered unknown state.") - - # channel.subscribe(_on_connectivity_changed) - - # def _send_rpc(): - # try: - # response = grpc.experimental.unary_unary(_REQUEST, - # target, - # _UNARY_UNARY, - # insecure=True) - # rpc_finished_event.set() - # except Exception as e: - # rpc_failed_event.set() - - # t = threading.Thread(target=_send_rpc) - # t.start() - # t.join() - # self.assertFalse(rpc_failed_event.is_set()) - # self.assertTrue(rpc_finished_event.is_set()) - # if server is not None: - # server.stop(None) + def test_unary_unary_insecure(self): + with _server(None) as port: + target = f'localhost:{port}' + response = grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + channel_credentials=grpc.experimental. + insecure_channel_credentials()) + self.assertEqual(_REQUEST, response) + + def test_unary_unary_secure(self): + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + response = grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + channel_credentials=grpc.local_channel_credentials()) + self.assertEqual(_REQUEST, response) + + def test_channels_cached(self): + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + test_name = inspect.stack()[0][3] + args = (_REQUEST, target, _UNARY_UNARY) + kwargs = {"channel_credentials": grpc.local_channel_credentials()} + + def _invoke(seed: str): + run_kwargs = dict(kwargs) + run_kwargs["options"] = ((test_name + seed, ""),) + grpc.experimental.unary_unary(*args, **run_kwargs) + + self.assert_cached(_invoke) + + def test_channels_evicted(self): + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + response = grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + channel_credentials=grpc.local_channel_credentials()) + self.assert_eventually( + lambda: grpc._simple_stubs.ChannelCache.get( + )._test_only_channel_count() == 0, + message=lambda: + f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain" + ) + + def test_total_channels_enforced(self): + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + for i in range(_STRESS_EPOCHS): + # Ensure we get a new channel each time. + options = (("foo", str(i)),) + # Send messages at full blast. + grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + options=options, + channel_credentials=grpc.local_channel_credentials()) + self.assert_eventually( + lambda: grpc._simple_stubs.ChannelCache.get( + )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1, + message=lambda: + f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain" + ) + + def test_unary_stream(self): + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + for response in grpc.experimental.unary_stream( + _REQUEST, + target, + _UNARY_STREAM, + channel_credentials=grpc.local_channel_credentials()): + self.assertEqual(_REQUEST, response) + + def test_stream_unary(self): + + def request_iter(): + for _ in range(_CLIENT_REQUEST_COUNT): + yield _REQUEST + + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + response = grpc.experimental.stream_unary( + request_iter(), + target, + _STREAM_UNARY, + channel_credentials=grpc.local_channel_credentials()) + self.assertEqual(_REQUEST, response) + + def test_stream_stream(self): + + def request_iter(): + for _ in range(_CLIENT_REQUEST_COUNT): + yield _REQUEST + + with _server(grpc.local_server_credentials()) as port: + target = f'localhost:{port}' + for response in grpc.experimental.stream_stream( + request_iter(), + target, + _STREAM_STREAM, + channel_credentials=grpc.local_channel_credentials()): + self.assertEqual(_REQUEST, response) + + def test_default_ssl(self): + _private_key = resources.private_key() + _certificate_chain = resources.certificate_chain() + _server_certs = ((_private_key, _certificate_chain),) + _server_host_override = 'foo.test.google.fr' + _test_root_certificates = resources.test_root_certificates() + _property_options = (( + 'grpc.ssl_target_name_override', + _server_host_override, + ),) + cert_dir = os.path.join(os.path.dirname(resources.__file__), + "credentials") + cert_file = os.path.join(cert_dir, "ca.pem") + with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file): + server_creds = grpc.ssl_server_credentials(_server_certs) + with _server(server_creds) as port: + target = f'localhost:{port}' + response = grpc.experimental.unary_unary( + _REQUEST, target, _UNARY_UNARY, options=_property_options) + + def test_insecure_sugar(self): + with _server(None) as port: + target = f'localhost:{port}' + response = grpc.experimental.unary_unary(_REQUEST, + target, + _UNARY_UNARY, + insecure=True) + self.assertEqual(_REQUEST, response) + + def test_insecure_sugar_mutually_exclusive(self): + with _server(None) as port: + target = f'localhost:{port}' + with self.assertRaises(ValueError): + response = grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + insecure=True, + channel_credentials=grpc.local_channel_credentials()) + + def test_default_wait_for_ready(self): + addr, port, sock = get_socket() + sock.close() + target = f'{addr}:{port}' + channel = grpc._simple_stubs.ChannelCache.get().get_channel( + target, (), None, True, None) + rpc_finished_event = threading.Event() + rpc_failed_event = threading.Event() + server = None + + def _on_connectivity_changed(connectivity): + nonlocal server + if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE: + self.assertFalse(rpc_finished_event.is_set()) + self.assertFalse(rpc_failed_event.is_set()) + server = test_common.test_server() + server.add_insecure_port(target) + server.add_generic_rpc_handlers((_GenericHandler(),)) + server.start() + channel.unsubscribe(_on_connectivity_changed) + elif connectivity in (grpc.ChannelConnectivity.IDLE, + grpc.ChannelConnectivity.CONNECTING): + pass + else: + self.fail("Encountered unknown state.") + + channel.subscribe(_on_connectivity_changed) + + def _send_rpc(): + try: + response = grpc.experimental.unary_unary(_REQUEST, + target, + _UNARY_UNARY, + insecure=True) + rpc_finished_event.set() + except Exception as e: + rpc_failed_event.set() + + t = threading.Thread(target=_send_rpc) + t.start() + t.join() + self.assertFalse(rpc_failed_event.is_set()) + self.assertTrue(rpc_finished_event.is_set()) + if server is not None: + server.stop(None) def assert_times_out(self, invocation_args): with _server(None) as port: From 4b497f7156cca701ecd612b41d8bbfab1308789e Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 10 Jun 2020 13:08:35 -0700 Subject: [PATCH 9/9] Compensate for low test default timeout --- .../grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index 520f5a2f6bb..08d5a882eb9 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -188,7 +188,8 @@ class SimpleStubsTest(unittest.TestCase): target, _UNARY_UNARY, channel_credentials=grpc.experimental. - insecure_channel_credentials()) + insecure_channel_credentials(), + timeout=None) self.assertEqual(_REQUEST, response) def test_unary_unary_secure(self): @@ -198,7 +199,8 @@ class SimpleStubsTest(unittest.TestCase): _REQUEST, target, _UNARY_UNARY, - channel_credentials=grpc.local_channel_credentials()) + channel_credentials=grpc.local_channel_credentials(), + timeout=None) self.assertEqual(_REQUEST, response) def test_channels_cached(self): @@ -363,6 +365,7 @@ class SimpleStubsTest(unittest.TestCase): response = grpc.experimental.unary_unary(_REQUEST, target, _UNARY_UNARY, + timeout=None, insecure=True) rpc_finished_event.set() except Exception as e: