diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index fe2b9fad99b..83133f2b6ea 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -339,7 +339,7 @@ bool PrintAlphaServerFactory(const grpc::string& package_qualified_service_name, } out->Print("}\n"); out->Print( - "return implementations.server(" + "return early_adopter_implementations.server(" "\"$PackageQualifiedServiceName$\"," " method_service_descriptions, port, private_key=private_key," " certificate_chain=certificate_chain)\n", @@ -422,7 +422,7 @@ bool PrintAlphaStubFactory(const grpc::string& package_qualified_service_name, } out->Print("}\n"); out->Print( - "return implementations.stub(" + "return early_adopter_implementations.stub(" "\"$PackageQualifiedServiceName$\"," " method_invocation_descriptions, host, port," " metadata_transformer=metadata_transformer, secure=secure," @@ -586,13 +586,13 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name, "Constructor", name_and_implementation_constructor->second); } out->Print("}\n"); - out->Print("server_options = beta.server_options(" + out->Print("server_options = beta_implementations.server_options(" "request_deserializers=request_deserializers, " "response_serializers=response_serializers, " "thread_pool=pool, thread_pool_size=pool_size, " "default_timeout=default_timeout, " "maximum_timeout=maximum_timeout)\n"); - out->Print("return beta.server(method_implementations, " + out->Print("return beta_implementations.server(method_implementations, " "options=server_options)\n"); } return true; @@ -685,13 +685,13 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name, "Cardinality", name_and_cardinality->second); } out->Print("}\n"); - out->Print("stub_options = beta.stub_options(" + out->Print("stub_options = beta_implementations.stub_options(" "host=host, metadata_transformer=metadata_transformer, " "request_serializers=request_serializers, " "response_deserializers=response_deserializers, " "thread_pool=pool, thread_pool_size=pool_size)\n"); out->Print( - "return beta.dynamic_stub(channel, \'$PackageQualifiedServiceName$\', " + "return beta_implementations.dynamic_stub(channel, \'$PackageQualifiedServiceName$\', " "cardinalities, options=stub_options)\n", "PackageQualifiedServiceName", package_qualified_service_name); } @@ -701,9 +701,9 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name, bool PrintPreamble(const FileDescriptor* file, const GeneratorConfiguration& config, Printer* out) { out->Print("import abc\n"); - out->Print("from $Package$ import beta\n", + out->Print("from $Package$ import implementations as beta_implementations\n", "Package", config.beta_package_root); - out->Print("from $Package$ import implementations\n", + out->Print("from $Package$ import implementations as early_adopter_implementations\n", "Package", config.early_adopter_package_root); out->Print("from grpc.framework.alpha import utilities as alpha_utilities\n"); out->Print("from grpc.framework.common import cardinality\n"); diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py index 457ede79f2e..61674a70add 100644 --- a/src/python/grpcio/grpc/beta/_connectivity_channel.py +++ b/src/python/grpcio/grpc/beta/_connectivity_channel.py @@ -33,18 +33,24 @@ import threading import time from grpc._adapter import _low +from grpc._adapter import _types +from grpc.beta import interfaces from grpc.framework.foundation import callable_util _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 'Exception calling channel subscription callback!') +_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { + state: connectivity for state, connectivity in zip( + _types.ConnectivityState, interfaces.ChannelConnectivity) +} + class ConnectivityChannel(object): - def __init__(self, low_channel, mapping): + def __init__(self, low_channel): self._lock = threading.Lock() self._low_channel = low_channel - self._mapping = mapping self._polling = False self._connectivity = None @@ -88,7 +94,8 @@ class ConnectivityChannel(object): try_to_connect = initial_try_to_connect low_connectivity = low_channel.check_connectivity_state(try_to_connect) with self._lock: - self._connectivity = self._mapping[low_connectivity] + self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ + low_connectivity] callbacks = tuple( callback for callback, unused_but_known_to_be_none_connectivity in self._callbacks_and_connectivities) @@ -112,7 +119,8 @@ class ConnectivityChannel(object): if event.success or try_to_connect: low_connectivity = low_channel.check_connectivity_state(try_to_connect) with self._lock: - self._connectivity = self._mapping[low_connectivity] + self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ + low_connectivity] if not self._delivering: callbacks = self._deliveries(self._connectivity) if callbacks: diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py index ebf91d80ab1..daa42c475ad 100644 --- a/src/python/grpcio/grpc/beta/_server.py +++ b/src/python/grpcio/grpc/beta/_server.py @@ -72,7 +72,7 @@ def _disassemble(grpc_link, end_link, pool, event, grace): event.set() -class Server(object): +class Server(interfaces.Server): def __init__(self, grpc_link, end_link, pool): self._grpc_link = grpc_link @@ -82,9 +82,9 @@ class Server(object): def add_insecure_port(self, address): return self._grpc_link.add_port(address, None) - def add_secure_port(self, address, intermediary_low_server_credentials): + def add_secure_port(self, address, server_credentials): return self._grpc_link.add_port( - address, intermediary_low_server_credentials) + address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access def start(self): self._grpc_link.join_link(self._end_link) diff --git a/src/python/grpcio/grpc/beta/beta.py b/src/python/grpcio/grpc/beta/implementations.py similarity index 75% rename from src/python/grpcio/grpc/beta/beta.py rename to src/python/grpcio/grpc/beta/implementations.py index b3a161087f3..9b461fb3ddf 100644 --- a/src/python/grpcio/grpc/beta/beta.py +++ b/src/python/grpcio/grpc/beta/implementations.py @@ -40,6 +40,7 @@ from grpc._adapter import _types from grpc.beta import _connectivity_channel from grpc.beta import _server from grpc.beta import _stub +from grpc.beta import interfaces from grpc.framework.common import cardinality # pylint: disable=unused-import from grpc.framework.interfaces.face import face # pylint: disable=unused-import @@ -47,32 +48,6 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 'Exception calling channel subscription callback!') -@enum.unique -class ChannelConnectivity(enum.Enum): - """Mirrors grpc_connectivity_state in the gRPC Core. - - Attributes: - IDLE: The channel is idle. - CONNECTING: The channel is connecting. - READY: The channel is ready to conduct RPCs. - TRANSIENT_FAILURE: The channel has seen a failure from which it expects to - recover. - FATAL_FAILURE: The channel has seen a failure from which it cannot recover. - """ - - IDLE = (_types.ConnectivityState.IDLE, 'idle',) - CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',) - READY = (_types.ConnectivityState.READY, 'ready',) - TRANSIENT_FAILURE = ( - _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',) - FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',) - -_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { - state: connectivity for state, connectivity in zip( - _types.ConnectivityState, ChannelConnectivity) -} - - class ClientCredentials(object): """A value encapsulating the data required to create a secure Channel. @@ -118,13 +93,14 @@ class Channel(object): self._low_channel = low_channel self._intermediary_low_channel = intermediary_low_channel self._connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY) + low_channel) def subscribe(self, callback, try_to_connect=None): """Subscribes to this Channel's connectivity. Args: - callback: A callable to be invoked and passed this Channel's connectivity. + callback: A callable to be invoked and passed an + interfaces.ChannelConnectivity identifying this Channel's connectivity. The callable will be invoked immediately upon subscription and again for every change to this Channel's connectivity thereafter until it is unsubscribed. @@ -144,7 +120,7 @@ class Channel(object): self._connectivity_channel.unsubscribe(callback) -def create_insecure_channel(host, port): +def insecure_channel(host, port): """Creates an insecure Channel to a remote host. Args: @@ -159,7 +135,7 @@ def create_insecure_channel(host, port): return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access -def create_secure_channel(host, port, client_credentials): +def secure_channel(host, port, client_credentials): """Creates a secure Channel to a remote host. Args: @@ -313,86 +289,6 @@ def ssl_server_credentials( intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access -class Server(object): - """Services RPCs.""" - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def add_insecure_port(self, address): - """Reserves a port for insecure RPC service once this Server becomes active. - - This method may only be called before calling this Server's start method is - called. - - Args: - address: The address for which to open a port. - - Returns: - An integer port on which RPCs will be serviced after this link has been - started. This is typically the same number as the port number contained - in the passed address, but will likely be different if the port number - contained in the passed address was zero. - """ - raise NotImplementedError() - - @abc.abstractmethod - def add_secure_port(self, address, server_credentials): - """Reserves a port for secure RPC service after this Server becomes active. - - This method may only be called before calling this Server's start method is - called. - - Args: - address: The address for which to open a port. - server_credentials: A ServerCredentials. - - Returns: - An integer port on which RPCs will be serviced after this link has been - started. This is typically the same number as the port number contained - in the passed address, but will likely be different if the port number - contained in the passed address was zero. - """ - raise NotImplementedError() - - @abc.abstractmethod - def start(self): - """Starts this Server's service of RPCs. - - This method may only be called while the server is not serving RPCs (i.e. it - is not idempotent). - """ - raise NotImplementedError() - - @abc.abstractmethod - def stop(self, grace): - """Stops this Server's service of RPCs. - - All calls to this method immediately stop service of new RPCs. When existing - RPCs are aborted is controlled by the grace period parameter passed to this - method. - - This method may be called at any time and is idempotent. Passing a smaller - grace value than has been passed in a previous call will have the effect of - stopping the Server sooner. Passing a larger grace value than has been - passed in a previous call will not have the effect of stopping the sooner - later. - - Args: - grace: A duration of time in seconds to allow existing RPCs to complete - before being aborted by this Server's stopping. May be zero for - immediate abortion of all in-progress RPCs. - - Returns: - A threading.Event that will be set when this Server has completely - stopped. The returned event may not be set until after the full grace - period (if some ongoing RPC continues for the full length of the period) - of it may be set much sooner (such as if this Server had no RPCs underway - at the time it was stopped or if all RPCs that it had underway completed - very early in the grace period). - """ - raise NotImplementedError() - - class ServerOptions(object): """A value encapsulating the various options for creation of a Server. @@ -450,27 +346,8 @@ def server_options( thread_pool, thread_pool_size, default_timeout, maximum_timeout) -class _Server(Server): - - def __init__(self, underserver): - self._underserver = underserver - - def add_insecure_port(self, address): - return self._underserver.add_insecure_port(address) - - def add_secure_port(self, address, server_credentials): - return self._underserver.add_secure_port( - address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access - - def start(self): - self._underserver.start() - - def stop(self, grace): - return self._underserver.stop(grace) - - def server(service_implementations, options=None): - """Creates a Server with which RPCs can be serviced. + """Creates an interfaces.Server with which RPCs can be serviced. Args: service_implementations: A dictionary from service name-method name pair to @@ -479,13 +356,12 @@ def server(service_implementations, options=None): functionality of the returned Server. Returns: - A Server with which RPCs can be serviced. + An interfaces.Server with which RPCs can be serviced. """ effective_options = _EMPTY_SERVER_OPTIONS if options is None else options - underserver = _server.server( + return _server.server( service_implementations, effective_options.multi_method_implementation, effective_options.request_deserializers, effective_options.response_serializers, effective_options.thread_pool, effective_options.thread_pool_size, effective_options.default_timeout, effective_options.maximum_timeout) - return _Server(underserver) diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py index 79f2620dd47..07c8618f70c 100644 --- a/src/python/grpcio/grpc/beta/interfaces.py +++ b/src/python/grpcio/grpc/beta/interfaces.py @@ -32,6 +32,28 @@ import abc import enum +from grpc._adapter import _types + + +@enum.unique +class ChannelConnectivity(enum.Enum): + """Mirrors grpc_connectivity_state in the gRPC Core. + + Attributes: + IDLE: The channel is idle. + CONNECTING: The channel is connecting. + READY: The channel is ready to conduct RPCs. + TRANSIENT_FAILURE: The channel has seen a failure from which it expects to + recover. + FATAL_FAILURE: The channel has seen a failure from which it cannot recover. + """ + IDLE = (_types.ConnectivityState.IDLE, 'idle',) + CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',) + READY = (_types.ConnectivityState.READY, 'ready',) + TRANSIENT_FAILURE = ( + _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',) + FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',) + @enum.unique class StatusCode(enum.Enum): @@ -110,3 +132,83 @@ class GRPCInvocationContext(object): def disable_next_request_compression(self): """Disables compression of the next request passed by the application.""" raise NotImplementedError() + + +class Server(object): + """Services RPCs.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def add_insecure_port(self, address): + """Reserves a port for insecure RPC service once this Server becomes active. + + This method may only be called before calling this Server's start method is + called. + + Args: + address: The address for which to open a port. + + Returns: + An integer port on which RPCs will be serviced after this link has been + started. This is typically the same number as the port number contained + in the passed address, but will likely be different if the port number + contained in the passed address was zero. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_secure_port(self, address, server_credentials): + """Reserves a port for secure RPC service after this Server becomes active. + + This method may only be called before calling this Server's start method is + called. + + Args: + address: The address for which to open a port. + server_credentials: A ServerCredentials. + + Returns: + An integer port on which RPCs will be serviced after this link has been + started. This is typically the same number as the port number contained + in the passed address, but will likely be different if the port number + contained in the passed address was zero. + """ + raise NotImplementedError() + + @abc.abstractmethod + def start(self): + """Starts this Server's service of RPCs. + + This method may only be called while the server is not serving RPCs (i.e. it + is not idempotent). + """ + raise NotImplementedError() + + @abc.abstractmethod + def stop(self, grace): + """Stops this Server's service of RPCs. + + All calls to this method immediately stop service of new RPCs. When existing + RPCs are aborted is controlled by the grace period parameter passed to this + method. + + This method may be called at any time and is idempotent. Passing a smaller + grace value than has been passed in a previous call will have the effect of + stopping the Server sooner. Passing a larger grace value than has been + passed in a previous call will not have the effect of stopping the sooner + later. + + Args: + grace: A duration of time in seconds to allow existing RPCs to complete + before being aborted by this Server's stopping. May be zero for + immediate abortion of all in-progress RPCs. + + Returns: + A threading.Event that will be set when this Server has completely + stopped. The returned event may not be set until after the full grace + period (if some ongoing RPC continues for the full length of the period) + of it may be set much sooner (such as if this Server had no RPCs underway + at the time it was stopped or if all RPCs that it had underway completed + very early in the grace period). + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py index 1b5356e3ad9..fb07a765795 100644 --- a/src/python/grpcio/grpc/beta/utilities.py +++ b/src/python/grpcio/grpc/beta/utilities.py @@ -32,7 +32,9 @@ import threading import time -from grpc.beta import beta +# implementations is referenced from specification in this module. +from grpc.beta import implementations # pylint: disable=unused-import +from grpc.beta import interfaces from grpc.framework.foundation import callable_util from grpc.framework.foundation import future @@ -70,7 +72,8 @@ class _ChannelReadyFuture(future.Future): def _update(self, connectivity): with self._condition: - if not self._cancelled and connectivity is beta.ChannelConnectivity.READY: + if (not self._cancelled and + connectivity is interfaces.ChannelConnectivity.READY): self._matured = True self._channel.unsubscribe(self._update) self._condition.notify_all() @@ -141,19 +144,19 @@ class _ChannelReadyFuture(future.Future): def channel_ready_future(channel): - """Creates a future.Future that matures when a beta.Channel is ready. + """Creates a future.Future tracking when an implementations.Channel is ready. - Cancelling the returned future.Future does not tell the given beta.Channel to - abandon attempts it may have been making to connect; cancelling merely - deactivates the return future.Future's subscription to the given - beta.Channel's connectivity. + Cancelling the returned future.Future does not tell the given + implementations.Channel to abandon attempts it may have been making to + connect; cancelling merely deactivates the return future.Future's + subscription to the given implementations.Channel's connectivity. Args: - channel: A beta.Channel. + channel: An implementations.Channel. Returns: A future.Future that matures when the given Channel has connectivity - beta.ChannelConnectivity.READY. + interfaces.ChannelConnectivity.READY. """ ready_future = _ChannelReadyFuture(channel) ready_future.start() diff --git a/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py index 4c8c64b06d4..259b978de25 100644 --- a/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py @@ -42,7 +42,7 @@ import threading import time import unittest -from grpc.beta import beta +from grpc.beta import implementations from grpc.framework.foundation import future from grpc.framework.interfaces.face import face from grpc_test.framework.common import test_constants @@ -170,7 +170,7 @@ def _CreateService(test_pb2): server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer) port = server.add_insecure_port('[::]:0') server.start() - channel = beta.create_insecure_channel('localhost', port) + channel = implementations.insecure_channel('localhost', port) stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel) yield servicer_methods, stub server.stop(0) diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py index 89fe4b2acfa..fad57da9d06 100644 --- a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py @@ -32,7 +32,7 @@ import threading import unittest -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import interfaces from grpc.framework.common import cardinality from grpc.framework.interfaces.face import utilities @@ -159,20 +159,21 @@ class BetaFeaturesTest(unittest.TestCase): _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, } - server_options = beta.server_options( + server_options = implementations.server_options( thread_pool_size=test_constants.POOL_SIZE) - self._server = beta.server(method_implementations, options=server_options) - server_credentials = beta.ssl_server_credentials( + self._server = implementations.server( + method_implementations, options=server_options) + server_credentials = implementations.ssl_server_credentials( [(resources.private_key(), resources.certificate_chain(),),]) port = self._server.add_secure_port('[::]:0', server_credentials) self._server.start() - self._client_credentials = beta.ssl_client_credentials( + self._client_credentials = implementations.ssl_client_credentials( resources.test_root_certificates(), None, None) - channel = test_utilities.create_not_really_secure_channel( + channel = test_utilities.not_really_secure_channel( 'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE) - stub_options = beta.stub_options( + stub_options = implementations.stub_options( thread_pool_size=test_constants.POOL_SIZE) - self._dynamic_stub = beta.dynamic_stub( + self._dynamic_stub = implementations.dynamic_stub( channel, _GROUP, cardinalities, options=stub_options) def tearDown(self): diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py index 038464889d6..b3c05bdb0c9 100644 --- a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py @@ -36,14 +36,9 @@ import unittest from grpc._adapter import _low from grpc._adapter import _types from grpc.beta import _connectivity_channel +from grpc.beta import interfaces from grpc_test.framework.common import test_constants -_MAPPING_FUNCTION = lambda integer: integer * 200 + 17 -_MAPPING = { - state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState} -_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map( - _MAPPING_FUNCTION, _types.ConnectivityState) - def _drive_completion_queue(completion_queue): while True: @@ -84,7 +79,7 @@ class ChannelConnectivityTest(unittest.TestCase): callback = _Callback() connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _MAPPING) + low_channel) connectivity_channel.subscribe(callback.update, try_to_connect=False) first_connectivities = callback.block_until_connectivities_satisfy(bool) connectivity_channel.subscribe(callback.update, try_to_connect=True) @@ -98,11 +93,16 @@ class ChannelConnectivityTest(unittest.TestCase): connectivity_channel.unsubscribe(callback.update) fifth_connectivities = callback.connectivities() - self.assertSequenceEqual((_IDLE,), first_connectivities) - self.assertNotIn(_READY, second_connectivities) - self.assertNotIn(_READY, third_connectivities) - self.assertNotIn(_READY, fourth_connectivities) - self.assertNotIn(_READY, fifth_connectivities) + self.assertSequenceEqual( + (interfaces.ChannelConnectivity.IDLE,), first_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, second_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, third_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, fourth_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, fifth_connectivities) def test_immediately_connectable_channel_connectivity(self): server_completion_queue = _low.CompletionQueue() @@ -117,7 +117,7 @@ class ChannelConnectivityTest(unittest.TestCase): second_callback = _Callback() connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _MAPPING) + low_channel) connectivity_channel.subscribe(first_callback.update, try_to_connect=False) first_connectivities = first_callback.block_until_connectivities_satisfy( bool) @@ -132,9 +132,11 @@ class ChannelConnectivityTest(unittest.TestCase): bool) # Wait for a connection that will happen (or may already have happened). first_callback.block_until_connectivities_satisfy( - lambda connectivities: _READY in connectivities) + lambda connectivities: + interfaces.ChannelConnectivity.READY in connectivities) second_callback.block_until_connectivities_satisfy( - lambda connectivities: _READY in connectivities) + lambda connectivities: + interfaces.ChannelConnectivity.READY in connectivities) connectivity_channel.unsubscribe(first_callback.update) connectivity_channel.unsubscribe(second_callback.update) @@ -142,12 +144,19 @@ class ChannelConnectivityTest(unittest.TestCase): server_completion_queue.shutdown() server_completion_queue_thread.join() - self.assertSequenceEqual((_IDLE,), first_connectivities) - self.assertSequenceEqual((_IDLE,), second_connectivities) - self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities) - self.assertNotIn(_FATAL_FAILURE, third_connectivities) - self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities) - self.assertNotIn(_FATAL_FAILURE, fourth_connectivities) + self.assertSequenceEqual( + (interfaces.ChannelConnectivity.IDLE,), first_connectivities) + self.assertSequenceEqual( + (interfaces.ChannelConnectivity.IDLE,), second_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.FATAL_FAILURE, third_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.TRANSIENT_FAILURE, + fourth_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities) def test_reachable_then_unreachable_channel_connectivity(self): server_completion_queue = _low.CompletionQueue() @@ -161,14 +170,16 @@ class ChannelConnectivityTest(unittest.TestCase): callback = _Callback() connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _MAPPING) + low_channel) connectivity_channel.subscribe(callback.update, try_to_connect=True) callback.block_until_connectivities_satisfy( - lambda connectivities: _READY in connectivities) + lambda connectivities: + interfaces.ChannelConnectivity.READY in connectivities) # Now take down the server and confirm that channel readiness is repudiated. server.shutdown() callback.block_until_connectivities_satisfy( - lambda connectivities: connectivities[-1] is not _READY) + lambda connectivities: + connectivities[-1] is not interfaces.ChannelConnectivity.READY) connectivity_channel.unsubscribe(callback.update) server.shutdown() diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py index e9087a79493..aa33e1e6f86 100644 --- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py @@ -32,7 +32,7 @@ import collections import unittest -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import interfaces from grpc_test import resources from grpc_test import test_common as grpc_test_common @@ -81,25 +81,26 @@ class _Implementation(test_interfaces.Implementation): method: method_object.cardinality() for (group, method), method_object in methods.iteritems()} - server_options = beta.server_options( + server_options = implementations.server_options( request_deserializers=serialization_behaviors.request_deserializers, response_serializers=serialization_behaviors.response_serializers, thread_pool_size=test_constants.POOL_SIZE) - server = beta.server(method_implementations, options=server_options) - server_credentials = beta.ssl_server_credentials( + server = implementations.server( + method_implementations, options=server_options) + server_credentials = implementations.ssl_server_credentials( [(resources.private_key(), resources.certificate_chain(),),]) port = server.add_secure_port('[::]:0', server_credentials) server.start() - client_credentials = beta.ssl_client_credentials( + client_credentials = implementations.ssl_client_credentials( resources.test_root_certificates(), None, None) - channel = test_utilities.create_not_really_secure_channel( + channel = test_utilities.not_really_secure_channel( 'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE) - stub_options = beta.stub_options( + stub_options = implementations.stub_options( request_serializers=serialization_behaviors.request_serializers, response_deserializers=serialization_behaviors.response_deserializers, thread_pool_size=test_constants.POOL_SIZE) - generic_stub = beta.generic_stub(channel, options=stub_options) - dynamic_stub = beta.dynamic_stub( + generic_stub = implementations.generic_stub(channel, options=stub_options) + dynamic_stub = implementations.dynamic_stub( channel, service, cardinalities, options=stub_options) return generic_stub, {service: dynamic_stub}, server diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py index ecd10f2175a..5feb997fef6 100644 --- a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py @@ -31,7 +31,7 @@ import unittest -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import interfaces from grpc.framework.interfaces.face import face from grpc_test.framework.common import test_constants @@ -40,10 +40,10 @@ from grpc_test.framework.common import test_constants class NotFoundTest(unittest.TestCase): def setUp(self): - self._server = beta.server({}) + self._server = implementations.server({}) port = self._server.add_insecure_port('[::]:0') - channel = beta.create_insecure_channel('localhost', port) - self._generic_stub = beta.generic_stub(channel) + channel = implementations.insecure_channel('localhost', port) + self._generic_stub = implementations.generic_stub(channel) self._server.start() def tearDown(self): diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py index 998e74ccf48..996cea91189 100644 --- a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py @@ -35,7 +35,7 @@ import unittest from grpc._adapter import _low from grpc._adapter import _types -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import utilities from grpc.framework.foundation import future from grpc_test.framework.common import test_constants @@ -69,7 +69,7 @@ class _Callback(object): class ChannelConnectivityTest(unittest.TestCase): def test_lonely_channel_connectivity(self): - channel = beta.create_insecure_channel('localhost', 12345) + channel = implementations.insecure_channel('localhost', 12345) callback = _Callback() ready_future = utilities.channel_ready_future(channel) @@ -94,7 +94,7 @@ class ChannelConnectivityTest(unittest.TestCase): server_completion_queue_thread = threading.Thread( target=_drive_completion_queue, args=(server_completion_queue,)) server_completion_queue_thread.start() - channel = beta.create_insecure_channel('localhost', port) + channel = implementations.insecure_channel('localhost', port) callback = _Callback() try: diff --git a/src/python/grpcio_test/grpc_test/beta/test_utilities.py b/src/python/grpcio_test/grpc_test/beta/test_utilities.py index 338670478d5..24a8600e12b 100644 --- a/src/python/grpcio_test/grpc_test/beta/test_utilities.py +++ b/src/python/grpcio_test/grpc_test/beta/test_utilities.py @@ -30,25 +30,27 @@ """Test-appropriate entry points into the gRPC Python Beta API.""" from grpc._adapter import _intermediary_low -from grpc.beta import beta +from grpc.beta import implementations -def create_not_really_secure_channel( +def not_really_secure_channel( host, port, client_credentials, server_host_override): """Creates an insecure Channel to a remote host. Args: host: The name of the remote host to which to connect. port: The port of the remote host to which to connect. - client_credentials: The beta.ClientCredentials with which to connect. + client_credentials: The implementations.ClientCredentials with which to + connect. server_host_override: The target name used for SSL host name checking. Returns: - A beta.Channel to the remote host through which RPCs may be conducted. + An implementations.Channel to the remote host through which RPCs may be + conducted. """ hostport = '%s:%d' % (host, port) intermediary_low_channel = _intermediary_low.Channel( hostport, client_credentials._intermediary_low_credentials, server_host_override=server_host_override) - return beta.Channel( + return implementations.Channel( intermediary_low_channel._internal, intermediary_low_channel)