From e329de88e7ba0deacc41e2c7379223029ac652a9 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jun 2020 12:22:59 -0700 Subject: [PATCH] Add failing test --- src/python/grpcio/grpc/_simple_stubs.py | 2 +- .../grpcio/grpc/experimental/__init__.py | 5 +- .../tests_py3_only/unit/_simple_stubs_test.py | 61 ++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index 2ee82ba8642..f17dc0abe03 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -53,7 +53,7 @@ else: def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], compression: Optional[grpc.Compression]) -> grpc.Channel: - if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials: + if channel_credentials is grpc.experimental.insecure_channel_credentials(): _LOGGER.debug(f"Creating insecure channel with options '{options}' " + f"and compression '{compression}'") return grpc.insecure_channel(target, diff --git a/src/python/grpcio/grpc/experimental/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py index eb642900d9c..9b93410df23 100644 --- a/src/python/grpcio/grpc/experimental/__init__.py +++ b/src/python/grpcio/grpc/experimental/__init__.py @@ -41,7 +41,8 @@ class UsageError(Exception): """Raised by the gRPC library to indicate usage not allowed by the API.""" -_insecure_channel_credentials = object() +_insecure_channel_credentials_sentinel = object() +_insecure_channel_credentials = grpc.ChannelCredentials(_insecure_channel_credentials_sentinel) def insecure_channel_credentials(): @@ -53,7 +54,7 @@ def insecure_channel_credentials(): used with grpc.unary_unary, grpc.unary_stream, grpc.stream_unary, or grpc.stream_stream. """ - return grpc.ChannelCredentials(_insecure_channel_credentials) + return _insecure_channel_credentials class ExperimentalApiWarning(Warning): diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index cb1ca3fabf8..c1863aa8a96 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -19,19 +19,21 @@ import os _MAXIMUM_CHANNELS = 10 -os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "1" +os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "2" os.environ["GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] = str(_MAXIMUM_CHANNELS) import contextlib import datetime import inspect import logging +import threading import unittest import sys import time from typing import Callable, Optional from tests.unit import test_common +from tests.unit.framework.common import get_socket from tests.unit import resources import grpc import grpc.experimental @@ -311,6 +313,63 @@ class SimpleStubsTest(unittest.TestCase): insecure=True, channel_credentials=grpc.local_channel_credentials()) + def test_default_wait_for_ready(self): + addr, port, sock = get_socket() + sock.close() + target = f'{addr}:{port}' + channel = grpc._simple_stubs.ChannelCache.get().get_channel(target, + (), + None, + True, + None) + rpc_finished_event = threading.Event() + rpc_failed_event = threading.Event() + server = None + + def _on_connectivity_changed(connectivity): + nonlocal server + if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE: + self.assertFalse(rpc_finished_event.is_set()) + self.assertFalse(rpc_failed_event.is_set()) + server = test_common.test_server() + server.add_insecure_port(target) + server.add_generic_rpc_handlers((_GenericHandler(),)) + server.start() + channel.unsubscribe(_on_connectivity_changed) + elif connectivity in (grpc.ChannelConnectivity.IDLE, grpc.ChannelConnectivity.CONNECTING): + pass + else: + raise AssertionError("Encountered unknown state.") + + channel.subscribe(_on_connectivity_changed) + + def _send_rpc(): + try: + response = grpc.experimental.unary_unary( + _REQUEST, + target, + _UNARY_UNARY, + # wait_for_ready=True, # remove + # timeout=30.0, + insecure=True) + rpc_finished_event.set() + except Exception as e: + import sys; sys.stderr.write(e); sys.stderr.flush() + rpc_failed_event.set() + + t = threading.Thread(target=_send_rpc) + t.start() + t.join() + self.assertFalse(rpc_failed_event.is_set()) + self.assertTrue(rpc_finished_event.is_set()) + if server is not None: + server.stop(None) + + + def test_wait_for_ready_default_set(self): + # TODO: Implement. + pass + if __name__ == "__main__": logging.basicConfig(level=logging.INFO)