Merge pull request #3093 from nathanielmanistaatgoogle/channel

The Beta API Channel
pull/3100/head
Masood Malekghassemi 10 years ago
commit 41d2b291eb
  1. 2
      src/python/grpcio/grpc/_adapter/_c/types/channel.c
  2. 28
      src/python/grpcio/grpc/beta/__init__.py
  3. 148
      src/python/grpcio/grpc/beta/_connectivity_channel.py
  4. 114
      src/python/grpcio/grpc/beta/beta.py
  5. 161
      src/python/grpcio/grpc/beta/utilities.py
  6. 30
      src/python/grpcio_test/grpc_test/beta/__init__.py
  7. 180
      src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
  8. 123
      src/python/grpcio_test/grpc_test/beta/_utilities_test.py

@ -164,7 +164,7 @@ PyObject *pygrpc_Channel_watch_connectivity_state(
int last_observed_state;
CompletionQueue *completion_queue;
char *keywords[] = {"last_observed_state", "deadline",
"completion_queue", "tag"};
"completion_queue", "tag", NULL};
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "idO!O:watch_connectivity_state", keywords,
&last_observed_state, &deadline, &pygrpc_CompletionQueue_type,

@ -0,0 +1,28 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,148 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Affords a connectivity-state-listenable channel."""
import threading
import time
from grpc._adapter import _low
from grpc.framework.foundation import callable_util
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
class ConnectivityChannel(object):
def __init__(self, low_channel, mapping):
self._lock = threading.Lock()
self._low_channel = low_channel
self._mapping = mapping
self._polling = False
self._connectivity = None
self._try_to_connect = False
self._callbacks_and_connectivities = []
self._delivering = False
def _deliveries(self, connectivity):
callbacks_needing_update = []
for callback_and_connectivity in self._callbacks_and_connectivities:
callback, callback_connectivity = callback_and_connectivity
if callback_connectivity is not connectivity:
callbacks_needing_update.append(callback)
callback_and_connectivity[1] = connectivity
return callbacks_needing_update
def _deliver(self, initial_connectivity, initial_callbacks):
connectivity = initial_connectivity
callbacks = initial_callbacks
while True:
for callback in callbacks:
callable_util.call_logging_exceptions(
callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
connectivity)
with self._lock:
callbacks = self._deliveries(self._connectivity)
if callbacks:
connectivity = self._connectivity
else:
self._delivering = False
return
def _spawn_delivery(self, connectivity, callbacks):
delivering_thread = threading.Thread(
target=self._deliver, args=(connectivity, callbacks,))
delivering_thread.start()
self._delivering = True
# TODO(issue 3064): Don't poll.
def _poll_connectivity(self, low_channel, initial_try_to_connect):
try_to_connect = initial_try_to_connect
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
self._connectivity = self._mapping[low_connectivity]
callbacks = tuple(
callback for callback, unused_but_known_to_be_none_connectivity
in self._callbacks_and_connectivities)
for callback_and_connectivity in self._callbacks_and_connectivities:
callback_and_connectivity[1] = self._connectivity
if callbacks:
self._spawn_delivery(self._connectivity, callbacks)
completion_queue = _low.CompletionQueue()
while True:
low_channel.watch_connectivity_state(
low_connectivity, time.time() + 0.2, completion_queue, None)
event = completion_queue.next()
with self._lock:
if not self._callbacks_and_connectivities and not self._try_to_connect:
self._polling = False
self._connectivity = None
completion_queue.shutdown()
break
try_to_connect = self._try_to_connect
self._try_to_connect = False
if event.success or try_to_connect:
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
self._connectivity = self._mapping[low_connectivity]
if not self._delivering:
callbacks = self._deliveries(self._connectivity)
if callbacks:
self._spawn_delivery(self._connectivity, callbacks)
def subscribe(self, callback, try_to_connect):
with self._lock:
if not self._callbacks_and_connectivities and not self._polling:
polling_thread = threading.Thread(
target=self._poll_connectivity,
args=(self._low_channel, bool(try_to_connect)))
polling_thread.start()
self._polling = True
self._callbacks_and_connectivities.append([callback, None])
elif not self._delivering and self._connectivity is not None:
self._spawn_delivery(self._connectivity, (callback,))
self._try_to_connect |= bool(try_to_connect)
self._callbacks_and_connectivities.append(
[callback, self._connectivity])
else:
self._try_to_connect |= bool(try_to_connect)
self._callbacks_and_connectivities.append([callback, None])
def unsubscribe(self, callback):
with self._lock:
for index, (subscribed_callback, unused_connectivity) in enumerate(
self._callbacks_and_connectivities):
if callback == subscribed_callback:
self._callbacks_and_connectivities.pop(index)
break
def low_channel(self):
return self._low_channel

@ -0,0 +1,114 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into gRPC Python Beta."""
import enum
from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import _connectivity_channel
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
@enum.unique
class ChannelConnectivity(enum.Enum):
"""Mirrors grpc_connectivity_state in the gRPC Core.
Attributes:
IDLE: The channel is idle.
CONNECTING: The channel is connecting.
READY: The channel is ready to conduct RPCs.
TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
recover.
FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
"""
IDLE = (_types.ConnectivityState.IDLE, 'idle',)
CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
READY = (_types.ConnectivityState.READY, 'ready',)
TRANSIENT_FAILURE = (
_types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
state: connectivity for state, connectivity in zip(
_types.ConnectivityState, ChannelConnectivity)
}
class Channel(object):
"""A channel to a remote host through which RPCs may be conducted.
Only the "subscribe" and "unsubscribe" methods are supported for application
use. This class' instance constructor and all other attributes are
unsupported.
"""
def __init__(self, low_channel):
self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
def subscribe(self, callback, try_to_connect=None):
"""Subscribes to this Channel's connectivity.
Args:
callback: A callable to be invoked and passed this Channel's connectivity.
The callable will be invoked immediately upon subscription and again for
every change to this Channel's connectivity thereafter until it is
unsubscribed.
try_to_connect: A boolean indicating whether or not this Channel should
attempt to connect if it is not already connected and ready to conduct
RPCs.
"""
self._connectivity_channel.subscribe(callback, try_to_connect)
def unsubscribe(self, callback):
"""Unsubscribes a callback from this Channel's connectivity.
Args:
callback: A callable previously registered with this Channel from having
been passed to its "subscribe" method.
"""
self._connectivity_channel.unsubscribe(callback)
def create_insecure_channel(host, port):
"""Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
Returns:
A Channel to the remote host through which RPCs may be conducted.
"""
return Channel(_low.Channel('%s:%d' % (host, port), ()))

@ -0,0 +1,161 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for the gRPC Python Beta API."""
import threading
import time
from grpc.beta import beta
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import future
_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
'Exception calling connectivity future "done" callback!')
class _ChannelReadyFuture(future.Future):
def __init__(self, channel):
self._condition = threading.Condition()
self._channel = channel
self._matured = False
self._cancelled = False
self._done_callbacks = []
def _block(self, timeout):
until = None if timeout is None else time.time() + timeout
with self._condition:
while True:
if self._cancelled:
raise future.CancelledError()
elif self._matured:
return
else:
if until is None:
self._condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
raise future.TimeoutError()
else:
self._condition.wait(timeout=remaining)
def _update(self, connectivity):
with self._condition:
if not self._cancelled and connectivity is beta.ChannelConnectivity.READY:
self._matured = True
self._channel.unsubscribe(self._update)
self._condition.notify_all()
done_callbacks = tuple(self._done_callbacks)
self._done_callbacks = None
else:
return
for done_callback in done_callbacks:
callable_util.call_logging_exceptions(
done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
def cancel(self):
with self._condition:
if not self._matured:
self._cancelled = True
self._channel.unsubscribe(self._update)
self._condition.notify_all()
done_callbacks = tuple(self._done_callbacks)
self._done_callbacks = None
else:
return False
for done_callback in done_callbacks:
callable_util.call_logging_exceptions(
done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
def cancelled(self):
with self._condition:
return self._cancelled
def running(self):
with self._condition:
return not self._cancelled and not self._matured
def done(self):
with self._condition:
return self._cancelled or self._matured
def result(self, timeout=None):
self._block(timeout)
return None
def exception(self, timeout=None):
self._block(timeout)
return None
def traceback(self, timeout=None):
self._block(timeout)
return None
def add_done_callback(self, fn):
with self._condition:
if not self._cancelled and not self._matured:
self._done_callbacks.append(fn)
return
fn(self)
def start(self):
with self._condition:
self._channel.subscribe(self._update, try_to_connect=True)
def __del__(self):
with self._condition:
if not self._cancelled and not self._matured:
self._channel.unsubscribe(self._update)
def channel_ready_future(channel):
"""Creates a future.Future that matures when a beta.Channel is ready.
Cancelling the returned future.Future does not tell the given beta.Channel to
abandon attempts it may have been making to connect; cancelling merely
deactivates the return future.Future's subscription to the given
beta.Channel's connectivity.
Args:
channel: A beta.Channel.
Returns:
A future.Future that matures when the given Channel has connectivity
beta.ChannelConnectivity.READY.
"""
ready_future = _ChannelReadyFuture(channel)
ready_future.start()
return ready_future

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,180 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of grpc.beta._connectivity_channel."""
import threading
import time
import unittest
from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import _connectivity_channel
from grpc_test.framework.common import test_constants
_MAPPING_FUNCTION = lambda integer: integer * 200 + 17
_MAPPING = {
state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState}
_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map(
_MAPPING_FUNCTION, _types.ConnectivityState)
def _drive_completion_queue(completion_queue):
while True:
event = completion_queue.next(time.time() + 24 * 60 * 60)
if event.type == _types.EventType.QUEUE_SHUTDOWN:
break
class _Callback(object):
def __init__(self):
self._condition = threading.Condition()
self._connectivities = []
def update(self, connectivity):
with self._condition:
self._connectivities.append(connectivity)
self._condition.notify()
def connectivities(self):
with self._condition:
return tuple(self._connectivities)
def block_until_connectivities_satisfy(self, predicate):
with self._condition:
while True:
connectivities = tuple(self._connectivities)
if predicate(connectivities):
return connectivities
else:
self._condition.wait()
class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
low_channel = _low.Channel('localhost:12345', ())
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
low_channel, _MAPPING)
connectivity_channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
second_connectivities = callback.block_until_connectivities_satisfy(
lambda connectivities: 2 <= len(connectivities))
# Wait for a connection that will never happen.
time.sleep(test_constants.SHORT_TIMEOUT)
third_connectivities = callback.connectivities()
connectivity_channel.unsubscribe(callback.update)
fourth_connectivities = callback.connectivities()
connectivity_channel.unsubscribe(callback.update)
fifth_connectivities = callback.connectivities()
self.assertSequenceEqual((_IDLE,), first_connectivities)
self.assertNotIn(_READY, second_connectivities)
self.assertNotIn(_READY, third_connectivities)
self.assertNotIn(_READY, fourth_connectivities)
self.assertNotIn(_READY, fifth_connectivities)
def test_immediately_connectable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
server = _low.Server(server_completion_queue, [])
port = server.add_http2_port('[::]:0')
server.start()
server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, args=(server_completion_queue,))
server_completion_queue_thread.start()
low_channel = _low.Channel('localhost:%d' % port, ())
first_callback = _Callback()
second_callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
low_channel, _MAPPING)
connectivity_channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
# Wait for a connection that will never happen because try_to_connect=True
# has not yet been passed.
time.sleep(test_constants.SHORT_TIMEOUT)
second_connectivities = first_callback.connectivities()
connectivity_channel.subscribe(second_callback.update, try_to_connect=True)
third_connectivities = first_callback.block_until_connectivities_satisfy(
lambda connectivities: 2 <= len(connectivities))
fourth_connectivities = second_callback.block_until_connectivities_satisfy(
bool)
# Wait for a connection that will happen (or may already have happened).
first_callback.block_until_connectivities_satisfy(
lambda connectivities: _READY in connectivities)
second_callback.block_until_connectivities_satisfy(
lambda connectivities: _READY in connectivities)
connectivity_channel.unsubscribe(first_callback.update)
connectivity_channel.unsubscribe(second_callback.update)
server.shutdown()
server_completion_queue.shutdown()
server_completion_queue_thread.join()
self.assertSequenceEqual((_IDLE,), first_connectivities)
self.assertSequenceEqual((_IDLE,), second_connectivities)
self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities)
self.assertNotIn(_FATAL_FAILURE, third_connectivities)
self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities)
self.assertNotIn(_FATAL_FAILURE, fourth_connectivities)
def test_reachable_then_unreachable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
server = _low.Server(server_completion_queue, [])
port = server.add_http2_port('[::]:0')
server.start()
server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, args=(server_completion_queue,))
server_completion_queue_thread.start()
low_channel = _low.Channel('localhost:%d' % port, ())
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
low_channel, _MAPPING)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(
lambda connectivities: _READY in connectivities)
# Now take down the server and confirm that channel readiness is repudiated.
server.shutdown()
callback.block_until_connectivities_satisfy(
lambda connectivities: connectivities[-1] is not _READY)
connectivity_channel.unsubscribe(callback.update)
server.shutdown()
server_completion_queue.shutdown()
server_completion_queue_thread.join()
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,123 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of grpc.beta.utilities."""
import threading
import time
import unittest
from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import beta
from grpc.beta import utilities
from grpc.framework.foundation import future
from grpc_test.framework.common import test_constants
def _drive_completion_queue(completion_queue):
while True:
event = completion_queue.next(time.time() + 24 * 60 * 60)
if event.type == _types.EventType.QUEUE_SHUTDOWN:
break
class _Callback(object):
def __init__(self):
self._condition = threading.Condition()
self._value = None
def accept_value(self, value):
with self._condition:
self._value = value
self._condition.notify_all()
def block_until_called(self):
with self._condition:
while self._value is None:
self._condition.wait()
return self._value
class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
channel = beta.create_insecure_channel('localhost', 12345)
callback = _Callback()
ready_future = utilities.channel_ready_future(channel)
ready_future.add_done_callback(callback.accept_value)
with self.assertRaises(future.TimeoutError):
ready_future.result(test_constants.SHORT_TIMEOUT)
self.assertFalse(ready_future.cancelled())
self.assertFalse(ready_future.done())
self.assertTrue(ready_future.running())
ready_future.cancel()
value_passed_to_callback = callback.block_until_called()
self.assertIs(ready_future, value_passed_to_callback)
self.assertTrue(ready_future.cancelled())
self.assertTrue(ready_future.done())
self.assertFalse(ready_future.running())
def test_immediately_connectable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
server = _low.Server(server_completion_queue, [])
port = server.add_http2_port('[::]:0')
server.start()
server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, args=(server_completion_queue,))
server_completion_queue_thread.start()
channel = beta.create_insecure_channel('localhost', port)
callback = _Callback()
try:
ready_future = utilities.channel_ready_future(channel)
ready_future.add_done_callback(callback.accept_value)
self.assertIsNone(
ready_future.result(test_constants.SHORT_TIMEOUT))
value_passed_to_callback = callback.block_until_called()
self.assertIs(ready_future, value_passed_to_callback)
self.assertFalse(ready_future.cancelled())
self.assertTrue(ready_future.done())
self.assertFalse(ready_future.running())
# Cancellation after maturity has no effect.
ready_future.cancel()
self.assertFalse(ready_future.cancelled())
self.assertTrue(ready_future.done())
self.assertFalse(ready_future.running())
finally:
ready_future.cancel()
server.shutdown()
server_completion_queue.shutdown()
server_completion_queue_thread.join()
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save