mirror of https://github.com/grpc/grpc.git
commit
c31c8f3d0e
62 changed files with 2698 additions and 171 deletions
@ -0,0 +1,28 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
@ -0,0 +1,148 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Affords a connectivity-state-listenable channel.""" |
||||||
|
|
||||||
|
import threading |
||||||
|
import time |
||||||
|
|
||||||
|
from grpc._adapter import _low |
||||||
|
from grpc.framework.foundation import callable_util |
||||||
|
|
||||||
|
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( |
||||||
|
'Exception calling channel subscription callback!') |
||||||
|
|
||||||
|
|
||||||
|
class ConnectivityChannel(object): |
||||||
|
|
||||||
|
def __init__(self, low_channel, mapping): |
||||||
|
self._lock = threading.Lock() |
||||||
|
self._low_channel = low_channel |
||||||
|
self._mapping = mapping |
||||||
|
|
||||||
|
self._polling = False |
||||||
|
self._connectivity = None |
||||||
|
self._try_to_connect = False |
||||||
|
self._callbacks_and_connectivities = [] |
||||||
|
self._delivering = False |
||||||
|
|
||||||
|
def _deliveries(self, connectivity): |
||||||
|
callbacks_needing_update = [] |
||||||
|
for callback_and_connectivity in self._callbacks_and_connectivities: |
||||||
|
callback, callback_connectivity = callback_and_connectivity |
||||||
|
if callback_connectivity is not connectivity: |
||||||
|
callbacks_needing_update.append(callback) |
||||||
|
callback_and_connectivity[1] = connectivity |
||||||
|
return callbacks_needing_update |
||||||
|
|
||||||
|
def _deliver(self, initial_connectivity, initial_callbacks): |
||||||
|
connectivity = initial_connectivity |
||||||
|
callbacks = initial_callbacks |
||||||
|
while True: |
||||||
|
for callback in callbacks: |
||||||
|
callable_util.call_logging_exceptions( |
||||||
|
callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE, |
||||||
|
connectivity) |
||||||
|
with self._lock: |
||||||
|
callbacks = self._deliveries(self._connectivity) |
||||||
|
if callbacks: |
||||||
|
connectivity = self._connectivity |
||||||
|
else: |
||||||
|
self._delivering = False |
||||||
|
return |
||||||
|
|
||||||
|
def _spawn_delivery(self, connectivity, callbacks): |
||||||
|
delivering_thread = threading.Thread( |
||||||
|
target=self._deliver, args=(connectivity, callbacks,)) |
||||||
|
delivering_thread.start() |
||||||
|
self._delivering = True |
||||||
|
|
||||||
|
# TODO(issue 3064): Don't poll. |
||||||
|
def _poll_connectivity(self, low_channel, initial_try_to_connect): |
||||||
|
try_to_connect = initial_try_to_connect |
||||||
|
low_connectivity = low_channel.check_connectivity_state(try_to_connect) |
||||||
|
with self._lock: |
||||||
|
self._connectivity = self._mapping[low_connectivity] |
||||||
|
callbacks = tuple( |
||||||
|
callback for callback, unused_but_known_to_be_none_connectivity |
||||||
|
in self._callbacks_and_connectivities) |
||||||
|
for callback_and_connectivity in self._callbacks_and_connectivities: |
||||||
|
callback_and_connectivity[1] = self._connectivity |
||||||
|
if callbacks: |
||||||
|
self._spawn_delivery(self._connectivity, callbacks) |
||||||
|
completion_queue = _low.CompletionQueue() |
||||||
|
while True: |
||||||
|
low_channel.watch_connectivity_state( |
||||||
|
low_connectivity, time.time() + 0.2, completion_queue, None) |
||||||
|
event = completion_queue.next() |
||||||
|
with self._lock: |
||||||
|
if not self._callbacks_and_connectivities and not self._try_to_connect: |
||||||
|
self._polling = False |
||||||
|
self._connectivity = None |
||||||
|
completion_queue.shutdown() |
||||||
|
break |
||||||
|
try_to_connect = self._try_to_connect |
||||||
|
self._try_to_connect = False |
||||||
|
if event.success or try_to_connect: |
||||||
|
low_connectivity = low_channel.check_connectivity_state(try_to_connect) |
||||||
|
with self._lock: |
||||||
|
self._connectivity = self._mapping[low_connectivity] |
||||||
|
if not self._delivering: |
||||||
|
callbacks = self._deliveries(self._connectivity) |
||||||
|
if callbacks: |
||||||
|
self._spawn_delivery(self._connectivity, callbacks) |
||||||
|
|
||||||
|
def subscribe(self, callback, try_to_connect): |
||||||
|
with self._lock: |
||||||
|
if not self._callbacks_and_connectivities and not self._polling: |
||||||
|
polling_thread = threading.Thread( |
||||||
|
target=self._poll_connectivity, |
||||||
|
args=(self._low_channel, bool(try_to_connect))) |
||||||
|
polling_thread.start() |
||||||
|
self._polling = True |
||||||
|
self._callbacks_and_connectivities.append([callback, None]) |
||||||
|
elif not self._delivering and self._connectivity is not None: |
||||||
|
self._spawn_delivery(self._connectivity, (callback,)) |
||||||
|
self._try_to_connect |= bool(try_to_connect) |
||||||
|
self._callbacks_and_connectivities.append( |
||||||
|
[callback, self._connectivity]) |
||||||
|
else: |
||||||
|
self._try_to_connect |= bool(try_to_connect) |
||||||
|
self._callbacks_and_connectivities.append([callback, None]) |
||||||
|
|
||||||
|
def unsubscribe(self, callback): |
||||||
|
with self._lock: |
||||||
|
for index, (subscribed_callback, unused_connectivity) in enumerate( |
||||||
|
self._callbacks_and_connectivities): |
||||||
|
if callback == subscribed_callback: |
||||||
|
self._callbacks_and_connectivities.pop(index) |
||||||
|
break |
||||||
|
|
||||||
|
def low_channel(self): |
||||||
|
return self._low_channel |
@ -0,0 +1,114 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Entry points into gRPC Python Beta.""" |
||||||
|
|
||||||
|
import enum |
||||||
|
|
||||||
|
from grpc._adapter import _low |
||||||
|
from grpc._adapter import _types |
||||||
|
from grpc.beta import _connectivity_channel |
||||||
|
|
||||||
|
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( |
||||||
|
'Exception calling channel subscription callback!') |
||||||
|
|
||||||
|
|
||||||
|
@enum.unique |
||||||
|
class ChannelConnectivity(enum.Enum): |
||||||
|
"""Mirrors grpc_connectivity_state in the gRPC Core. |
||||||
|
|
||||||
|
Attributes: |
||||||
|
IDLE: The channel is idle. |
||||||
|
CONNECTING: The channel is connecting. |
||||||
|
READY: The channel is ready to conduct RPCs. |
||||||
|
TRANSIENT_FAILURE: The channel has seen a failure from which it expects to |
||||||
|
recover. |
||||||
|
FATAL_FAILURE: The channel has seen a failure from which it cannot recover. |
||||||
|
""" |
||||||
|
|
||||||
|
IDLE = (_types.ConnectivityState.IDLE, 'idle',) |
||||||
|
CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',) |
||||||
|
READY = (_types.ConnectivityState.READY, 'ready',) |
||||||
|
TRANSIENT_FAILURE = ( |
||||||
|
_types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',) |
||||||
|
FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',) |
||||||
|
|
||||||
|
_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { |
||||||
|
state: connectivity for state, connectivity in zip( |
||||||
|
_types.ConnectivityState, ChannelConnectivity) |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
class Channel(object): |
||||||
|
"""A channel to a remote host through which RPCs may be conducted. |
||||||
|
|
||||||
|
Only the "subscribe" and "unsubscribe" methods are supported for application |
||||||
|
use. This class' instance constructor and all other attributes are |
||||||
|
unsupported. |
||||||
|
""" |
||||||
|
|
||||||
|
def __init__(self, low_channel): |
||||||
|
self._connectivity_channel = _connectivity_channel.ConnectivityChannel( |
||||||
|
low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY) |
||||||
|
|
||||||
|
def subscribe(self, callback, try_to_connect=None): |
||||||
|
"""Subscribes to this Channel's connectivity. |
||||||
|
|
||||||
|
Args: |
||||||
|
callback: A callable to be invoked and passed this Channel's connectivity. |
||||||
|
The callable will be invoked immediately upon subscription and again for |
||||||
|
every change to this Channel's connectivity thereafter until it is |
||||||
|
unsubscribed. |
||||||
|
try_to_connect: A boolean indicating whether or not this Channel should |
||||||
|
attempt to connect if it is not already connected and ready to conduct |
||||||
|
RPCs. |
||||||
|
""" |
||||||
|
self._connectivity_channel.subscribe(callback, try_to_connect) |
||||||
|
|
||||||
|
def unsubscribe(self, callback): |
||||||
|
"""Unsubscribes a callback from this Channel's connectivity. |
||||||
|
|
||||||
|
Args: |
||||||
|
callback: A callable previously registered with this Channel from having |
||||||
|
been passed to its "subscribe" method. |
||||||
|
""" |
||||||
|
self._connectivity_channel.unsubscribe(callback) |
||||||
|
|
||||||
|
|
||||||
|
def create_insecure_channel(host, port): |
||||||
|
"""Creates an insecure Channel to a remote host. |
||||||
|
|
||||||
|
Args: |
||||||
|
host: The name of the remote host to which to connect. |
||||||
|
port: The port of the remote host to which to connect. |
||||||
|
|
||||||
|
Returns: |
||||||
|
A Channel to the remote host through which RPCs may be conducted. |
||||||
|
""" |
||||||
|
return Channel(_low.Channel('%s:%d' % (host, port), ())) |
@ -0,0 +1,161 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Utilities for the gRPC Python Beta API.""" |
||||||
|
|
||||||
|
import threading |
||||||
|
import time |
||||||
|
|
||||||
|
from grpc.beta import beta |
||||||
|
from grpc.framework.foundation import callable_util |
||||||
|
from grpc.framework.foundation import future |
||||||
|
|
||||||
|
_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( |
||||||
|
'Exception calling connectivity future "done" callback!') |
||||||
|
|
||||||
|
|
||||||
|
class _ChannelReadyFuture(future.Future): |
||||||
|
|
||||||
|
def __init__(self, channel): |
||||||
|
self._condition = threading.Condition() |
||||||
|
self._channel = channel |
||||||
|
|
||||||
|
self._matured = False |
||||||
|
self._cancelled = False |
||||||
|
self._done_callbacks = [] |
||||||
|
|
||||||
|
def _block(self, timeout): |
||||||
|
until = None if timeout is None else time.time() + timeout |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._cancelled: |
||||||
|
raise future.CancelledError() |
||||||
|
elif self._matured: |
||||||
|
return |
||||||
|
else: |
||||||
|
if until is None: |
||||||
|
self._condition.wait() |
||||||
|
else: |
||||||
|
remaining = until - time.time() |
||||||
|
if remaining < 0: |
||||||
|
raise future.TimeoutError() |
||||||
|
else: |
||||||
|
self._condition.wait(timeout=remaining) |
||||||
|
|
||||||
|
def _update(self, connectivity): |
||||||
|
with self._condition: |
||||||
|
if not self._cancelled and connectivity is beta.ChannelConnectivity.READY: |
||||||
|
self._matured = True |
||||||
|
self._channel.unsubscribe(self._update) |
||||||
|
self._condition.notify_all() |
||||||
|
done_callbacks = tuple(self._done_callbacks) |
||||||
|
self._done_callbacks = None |
||||||
|
else: |
||||||
|
return |
||||||
|
|
||||||
|
for done_callback in done_callbacks: |
||||||
|
callable_util.call_logging_exceptions( |
||||||
|
done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) |
||||||
|
|
||||||
|
def cancel(self): |
||||||
|
with self._condition: |
||||||
|
if not self._matured: |
||||||
|
self._cancelled = True |
||||||
|
self._channel.unsubscribe(self._update) |
||||||
|
self._condition.notify_all() |
||||||
|
done_callbacks = tuple(self._done_callbacks) |
||||||
|
self._done_callbacks = None |
||||||
|
else: |
||||||
|
return False |
||||||
|
|
||||||
|
for done_callback in done_callbacks: |
||||||
|
callable_util.call_logging_exceptions( |
||||||
|
done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) |
||||||
|
|
||||||
|
def cancelled(self): |
||||||
|
with self._condition: |
||||||
|
return self._cancelled |
||||||
|
|
||||||
|
def running(self): |
||||||
|
with self._condition: |
||||||
|
return not self._cancelled and not self._matured |
||||||
|
|
||||||
|
def done(self): |
||||||
|
with self._condition: |
||||||
|
return self._cancelled or self._matured |
||||||
|
|
||||||
|
def result(self, timeout=None): |
||||||
|
self._block(timeout) |
||||||
|
return None |
||||||
|
|
||||||
|
def exception(self, timeout=None): |
||||||
|
self._block(timeout) |
||||||
|
return None |
||||||
|
|
||||||
|
def traceback(self, timeout=None): |
||||||
|
self._block(timeout) |
||||||
|
return None |
||||||
|
|
||||||
|
def add_done_callback(self, fn): |
||||||
|
with self._condition: |
||||||
|
if not self._cancelled and not self._matured: |
||||||
|
self._done_callbacks.append(fn) |
||||||
|
return |
||||||
|
|
||||||
|
fn(self) |
||||||
|
|
||||||
|
def start(self): |
||||||
|
with self._condition: |
||||||
|
self._channel.subscribe(self._update, try_to_connect=True) |
||||||
|
|
||||||
|
def __del__(self): |
||||||
|
with self._condition: |
||||||
|
if not self._cancelled and not self._matured: |
||||||
|
self._channel.unsubscribe(self._update) |
||||||
|
|
||||||
|
|
||||||
|
def channel_ready_future(channel): |
||||||
|
"""Creates a future.Future that matures when a beta.Channel is ready. |
||||||
|
|
||||||
|
Cancelling the returned future.Future does not tell the given beta.Channel to |
||||||
|
abandon attempts it may have been making to connect; cancelling merely |
||||||
|
deactivates the return future.Future's subscription to the given |
||||||
|
beta.Channel's connectivity. |
||||||
|
|
||||||
|
Args: |
||||||
|
channel: A beta.Channel. |
||||||
|
|
||||||
|
Returns: |
||||||
|
A future.Future that matures when the given Channel has connectivity |
||||||
|
beta.ChannelConnectivity.READY. |
||||||
|
""" |
||||||
|
ready_future = _ChannelReadyFuture(channel) |
||||||
|
ready_future.start() |
||||||
|
return ready_future |
||||||
|
|
@ -0,0 +1,30 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
|
@ -0,0 +1,204 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Utility functions for invoking RPCs.""" |
||||||
|
|
||||||
|
from grpc.framework.crust import _control |
||||||
|
from grpc.framework.interfaces.base import utilities |
||||||
|
from grpc.framework.interfaces.face import face |
||||||
|
|
||||||
|
_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!' |
||||||
|
|
||||||
|
_EMPTY_COMPLETION = utilities.completion(None, None, None) |
||||||
|
|
||||||
|
|
||||||
|
def _invoke(end, group, method, timeout, initial_metadata, payload, complete): |
||||||
|
rendezvous = _control.Rendezvous(None, None) |
||||||
|
operation_context, operator = end.operate( |
||||||
|
group, method, utilities.full_subscription(rendezvous), timeout, |
||||||
|
initial_metadata=initial_metadata, payload=payload, |
||||||
|
completion=_EMPTY_COMPLETION if complete else None) |
||||||
|
rendezvous.set_operator_and_context(operator, operation_context) |
||||||
|
outcome = operation_context.add_termination_callback(rendezvous.set_outcome) |
||||||
|
if outcome is not None: |
||||||
|
rendezvous.set_outcome(outcome) |
||||||
|
return rendezvous, operation_context, outcome |
||||||
|
|
||||||
|
|
||||||
|
def _event_return_unary( |
||||||
|
receiver, abortion_callback, rendezvous, operation_context, outcome, pool): |
||||||
|
if outcome is None: |
||||||
|
def in_pool(): |
||||||
|
abortion = rendezvous.add_abortion_callback(abortion_callback) |
||||||
|
if abortion is None: |
||||||
|
try: |
||||||
|
receiver.initial_metadata(rendezvous.initial_metadata()) |
||||||
|
receiver.response(next(rendezvous)) |
||||||
|
receiver.complete( |
||||||
|
rendezvous.terminal_metadata(), rendezvous.code(), |
||||||
|
rendezvous.details()) |
||||||
|
except face.AbortionError: |
||||||
|
pass |
||||||
|
else: |
||||||
|
abortion_callback(abortion) |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context)) |
||||||
|
return rendezvous |
||||||
|
|
||||||
|
|
||||||
|
def _event_return_stream( |
||||||
|
receiver, abortion_callback, rendezvous, operation_context, outcome, pool): |
||||||
|
if outcome is None: |
||||||
|
def in_pool(): |
||||||
|
abortion = rendezvous.add_abortion_callback(abortion_callback) |
||||||
|
if abortion is None: |
||||||
|
try: |
||||||
|
receiver.initial_metadata(rendezvous.initial_metadata()) |
||||||
|
for response in rendezvous: |
||||||
|
receiver.response(response) |
||||||
|
receiver.complete( |
||||||
|
rendezvous.terminal_metadata(), rendezvous.code(), |
||||||
|
rendezvous.details()) |
||||||
|
except face.AbortionError: |
||||||
|
pass |
||||||
|
else: |
||||||
|
abortion_callback(abortion) |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context)) |
||||||
|
return rendezvous |
||||||
|
|
||||||
|
|
||||||
|
def blocking_unary_unary( |
||||||
|
end, group, method, timeout, with_call, initial_metadata, payload): |
||||||
|
"""Services in a blocking fashion a unary-unary servicer method.""" |
||||||
|
rendezvous, unused_operation_context, unused_outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, payload, True) |
||||||
|
if with_call: |
||||||
|
return next(rendezvous, rendezvous) |
||||||
|
else: |
||||||
|
return next(rendezvous) |
||||||
|
|
||||||
|
|
||||||
|
def future_unary_unary(end, group, method, timeout, initial_metadata, payload): |
||||||
|
"""Services a value-in value-out servicer method by returning a Future.""" |
||||||
|
rendezvous, unused_operation_context, unused_outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, payload, True) |
||||||
|
return rendezvous |
||||||
|
|
||||||
|
|
||||||
|
def inline_unary_stream(end, group, method, timeout, initial_metadata, payload): |
||||||
|
"""Services a value-in stream-out servicer method.""" |
||||||
|
rendezvous, unused_operation_context, unused_outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, payload, True) |
||||||
|
return rendezvous |
||||||
|
|
||||||
|
|
||||||
|
def blocking_stream_unary( |
||||||
|
end, group, method, timeout, with_call, initial_metadata, payload_iterator, |
||||||
|
pool): |
||||||
|
"""Services in a blocking fashion a stream-in value-out servicer method.""" |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, None, False) |
||||||
|
if outcome is None: |
||||||
|
def in_pool(): |
||||||
|
for payload in payload_iterator: |
||||||
|
rendezvous.consume(payload) |
||||||
|
rendezvous.terminate() |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context)) |
||||||
|
if with_call: |
||||||
|
return next(rendezvous), rendezvous |
||||||
|
else: |
||||||
|
return next(rendezvous) |
||||||
|
else: |
||||||
|
if with_call: |
||||||
|
return next(rendezvous), rendezvous |
||||||
|
else: |
||||||
|
return next(rendezvous) |
||||||
|
|
||||||
|
|
||||||
|
def future_stream_unary( |
||||||
|
end, group, method, timeout, initial_metadata, payload_iterator, pool): |
||||||
|
"""Services a stream-in value-out servicer method by returning a Future.""" |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, None, False) |
||||||
|
if outcome is None: |
||||||
|
def in_pool(): |
||||||
|
for payload in payload_iterator: |
||||||
|
rendezvous.consume(payload) |
||||||
|
rendezvous.terminate() |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context)) |
||||||
|
return rendezvous |
||||||
|
|
||||||
|
|
||||||
|
def inline_stream_stream( |
||||||
|
end, group, method, timeout, initial_metadata, payload_iterator, pool): |
||||||
|
"""Services a stream-in stream-out servicer method.""" |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, None, False) |
||||||
|
if outcome is None: |
||||||
|
def in_pool(): |
||||||
|
for payload in payload_iterator: |
||||||
|
rendezvous.consume(payload) |
||||||
|
rendezvous.terminate() |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context)) |
||||||
|
return rendezvous |
||||||
|
|
||||||
|
|
||||||
|
def event_unary_unary( |
||||||
|
end, group, method, timeout, initial_metadata, payload, receiver, |
||||||
|
abortion_callback, pool): |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, payload, True) |
||||||
|
return _event_return_unary( |
||||||
|
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) |
||||||
|
|
||||||
|
|
||||||
|
def event_unary_stream( |
||||||
|
end, group, method, timeout, initial_metadata, payload, |
||||||
|
receiver, abortion_callback, pool): |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, payload, True) |
||||||
|
return _event_return_stream( |
||||||
|
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) |
||||||
|
|
||||||
|
|
||||||
|
def event_stream_unary( |
||||||
|
end, group, method, timeout, initial_metadata, receiver, abortion_callback, |
||||||
|
pool): |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, None, False) |
||||||
|
return _event_return_unary( |
||||||
|
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) |
||||||
|
|
||||||
|
|
||||||
|
def event_stream_stream( |
||||||
|
end, group, method, timeout, initial_metadata, receiver, abortion_callback, |
||||||
|
pool): |
||||||
|
rendezvous, operation_context, outcome = _invoke( |
||||||
|
end, group, method, timeout, initial_metadata, None, False) |
||||||
|
return _event_return_stream( |
||||||
|
receiver, abortion_callback, rendezvous, operation_context, outcome, pool) |
@ -0,0 +1,545 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""State and behavior for translating between sync and async control flow.""" |
||||||
|
|
||||||
|
import collections |
||||||
|
import enum |
||||||
|
import sys |
||||||
|
import threading |
||||||
|
import time |
||||||
|
|
||||||
|
from grpc.framework.foundation import abandonment |
||||||
|
from grpc.framework.foundation import callable_util |
||||||
|
from grpc.framework.foundation import future |
||||||
|
from grpc.framework.foundation import stream |
||||||
|
from grpc.framework.interfaces.base import base |
||||||
|
from grpc.framework.interfaces.base import utilities |
||||||
|
from grpc.framework.interfaces.face import face |
||||||
|
|
||||||
|
_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!' |
||||||
|
_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:' |
||||||
|
|
||||||
|
_CANNOT_SET_INITIAL_METADATA = ( |
||||||
|
'Could not set initial metadata - has it already been set, or has a ' + |
||||||
|
'payload already been sent?') |
||||||
|
_CANNOT_SET_TERMINAL_METADATA = ( |
||||||
|
'Could not set terminal metadata - has it already been set, or has RPC ' + |
||||||
|
'completion already been indicated?') |
||||||
|
_CANNOT_SET_CODE = ( |
||||||
|
'Could not set code - has it already been set, or has RPC completion ' + |
||||||
|
'already been indicated?') |
||||||
|
_CANNOT_SET_DETAILS = ( |
||||||
|
'Could not set details - has it already been set, or has RPC completion ' + |
||||||
|
'already been indicated?') |
||||||
|
|
||||||
|
|
||||||
|
class _DummyOperator(base.Operator): |
||||||
|
|
||||||
|
def advance( |
||||||
|
self, initial_metadata=None, payload=None, completion=None, |
||||||
|
allowance=None): |
||||||
|
pass |
||||||
|
|
||||||
|
_DUMMY_OPERATOR = _DummyOperator() |
||||||
|
|
||||||
|
|
||||||
|
class _Awaited( |
||||||
|
collections.namedtuple('_Awaited', ('kind', 'value',))): |
||||||
|
|
||||||
|
@enum.unique |
||||||
|
class Kind(enum.Enum): |
||||||
|
NOT_YET_ARRIVED = 'not yet arrived' |
||||||
|
ARRIVED = 'arrived' |
||||||
|
|
||||||
|
_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None) |
||||||
|
_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None) |
||||||
|
|
||||||
|
|
||||||
|
class _Transitory( |
||||||
|
collections.namedtuple('_Transitory', ('kind', 'value',))): |
||||||
|
|
||||||
|
@enum.unique |
||||||
|
class Kind(enum.Enum): |
||||||
|
NOT_YET_SEEN = 'not yet seen' |
||||||
|
PRESENT = 'present' |
||||||
|
GONE = 'gone' |
||||||
|
|
||||||
|
_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None) |
||||||
|
_GONE = _Transitory(_Transitory.Kind.GONE, None) |
||||||
|
|
||||||
|
|
||||||
|
class _Termination( |
||||||
|
collections.namedtuple( |
||||||
|
'_Termination', ('terminated', 'abortion', 'abortion_error',))): |
||||||
|
"""Values indicating whether and how an RPC has terminated. |
||||||
|
|
||||||
|
Attributes: |
||||||
|
terminated: A boolean indicating whether or not the RPC has terminated. |
||||||
|
abortion: A face.Abortion value describing the RPC's abortion or None if the |
||||||
|
RPC did not abort. |
||||||
|
abortion_error: A face.AbortionError describing the RPC's abortion or None |
||||||
|
if the RPC did not abort. |
||||||
|
""" |
||||||
|
|
||||||
|
_NOT_TERMINATED = _Termination(False, None, None) |
||||||
|
|
||||||
|
_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = { |
||||||
|
base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None), |
||||||
|
base.Outcome.CANCELLED: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.CANCELLED, *args), |
||||||
|
face.CancellationError(*args)), |
||||||
|
base.Outcome.EXPIRED: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.EXPIRED, *args), |
||||||
|
face.ExpirationError(*args)), |
||||||
|
base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args), |
||||||
|
face.LocalShutdownError(*args)), |
||||||
|
base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args), |
||||||
|
face.RemoteShutdownError(*args)), |
||||||
|
base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), |
||||||
|
face.NetworkError(*args)), |
||||||
|
base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), |
||||||
|
face.NetworkError(*args)), |
||||||
|
base.Outcome.LOCAL_FAILURE: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args), |
||||||
|
face.LocalError(*args)), |
||||||
|
base.Outcome.REMOTE_FAILURE: lambda *args: _Termination( |
||||||
|
True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args), |
||||||
|
face.RemoteError(*args)), |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
def _wait_once_until(condition, until): |
||||||
|
if until is None: |
||||||
|
condition.wait() |
||||||
|
else: |
||||||
|
remaining = until - time.time() |
||||||
|
if remaining < 0: |
||||||
|
raise future.TimeoutError() |
||||||
|
else: |
||||||
|
condition.wait(timeout=remaining) |
||||||
|
|
||||||
|
|
||||||
|
def _done_callback_as_operation_termination_callback( |
||||||
|
done_callback, rendezvous): |
||||||
|
def operation_termination_callback(operation_outcome): |
||||||
|
rendezvous.set_outcome(operation_outcome) |
||||||
|
done_callback(rendezvous) |
||||||
|
return operation_termination_callback |
||||||
|
|
||||||
|
|
||||||
|
def _abortion_callback_as_operation_termination_callback( |
||||||
|
rpc_abortion_callback, rendezvous_set_outcome): |
||||||
|
def operation_termination_callback(operation_outcome): |
||||||
|
termination = rendezvous_set_outcome(operation_outcome) |
||||||
|
if termination.abortion is not None: |
||||||
|
rpc_abortion_callback(termination.abortion) |
||||||
|
return operation_termination_callback |
||||||
|
|
||||||
|
|
||||||
|
class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call): |
||||||
|
"""A rendez-vous for the threads of an operation. |
||||||
|
|
||||||
|
Instances of this object present iterator and stream.Consumer interfaces for |
||||||
|
interacting with application code and present a base.Operator interface and |
||||||
|
maintain a base.Operator internally for interacting with base interface code. |
||||||
|
""" |
||||||
|
|
||||||
|
def __init__(self, operator, operation_context): |
||||||
|
self._condition = threading.Condition() |
||||||
|
|
||||||
|
self._operator = operator |
||||||
|
self._operation_context = operation_context |
||||||
|
|
||||||
|
self._up_initial_metadata = _NOT_YET_ARRIVED |
||||||
|
self._up_payload = None |
||||||
|
self._up_allowance = 1 |
||||||
|
self._up_completion = _NOT_YET_ARRIVED |
||||||
|
self._down_initial_metadata = _NOT_YET_SEEN |
||||||
|
self._down_payload = None |
||||||
|
self._down_allowance = 1 |
||||||
|
self._down_terminal_metadata = _NOT_YET_SEEN |
||||||
|
self._down_code = _NOT_YET_SEEN |
||||||
|
self._down_details = _NOT_YET_SEEN |
||||||
|
|
||||||
|
self._termination = _NOT_TERMINATED |
||||||
|
|
||||||
|
# The semantics of future.Future.cancel and future.Future.cancelled are |
||||||
|
# slightly wonky, so they have to be tracked separately from the rest of the |
||||||
|
# result of the RPC. This field tracks whether cancellation was requested |
||||||
|
# prior to termination of the RPC |
||||||
|
self._cancelled = False |
||||||
|
|
||||||
|
def set_operator_and_context(self, operator, operation_context): |
||||||
|
with self._condition: |
||||||
|
self._operator = operator |
||||||
|
self._operation_context = operation_context |
||||||
|
|
||||||
|
def _down_completion(self): |
||||||
|
if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN: |
||||||
|
terminal_metadata = None |
||||||
|
self._down_terminal_metadata = _GONE |
||||||
|
elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT: |
||||||
|
terminal_metadata = self._down_terminal_metadata.value |
||||||
|
self._down_terminal_metadata = _GONE |
||||||
|
else: |
||||||
|
terminal_metadata = None |
||||||
|
if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN: |
||||||
|
code = None |
||||||
|
self._down_code = _GONE |
||||||
|
elif self._down_code.kind is _Transitory.Kind.PRESENT: |
||||||
|
code = self._down_code.value |
||||||
|
self._down_code = _GONE |
||||||
|
else: |
||||||
|
code = None |
||||||
|
if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN: |
||||||
|
details = None |
||||||
|
self._down_details = _GONE |
||||||
|
elif self._down_details.kind is _Transitory.Kind.PRESENT: |
||||||
|
details = self._down_details.value |
||||||
|
self._down_details = _GONE |
||||||
|
else: |
||||||
|
details = None |
||||||
|
return utilities.completion(terminal_metadata, code, details) |
||||||
|
|
||||||
|
def _set_outcome(self, outcome): |
||||||
|
if not self._termination.terminated: |
||||||
|
self._operator = _DUMMY_OPERATOR |
||||||
|
self._operation_context = None |
||||||
|
self._down_initial_metadata = _GONE |
||||||
|
self._down_payload = None |
||||||
|
self._down_terminal_metadata = _GONE |
||||||
|
self._down_code = _GONE |
||||||
|
self._down_details = _GONE |
||||||
|
|
||||||
|
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
||||||
|
initial_metadata = None |
||||||
|
else: |
||||||
|
initial_metadata = self._up_initial_metadata.value |
||||||
|
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
||||||
|
terminal_metadata, code, details = None, None, None |
||||||
|
else: |
||||||
|
terminal_metadata = self._up_completion.value.terminal_metadata |
||||||
|
code = self._up_completion.value.code |
||||||
|
details = self._up_completion.value.message |
||||||
|
self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[ |
||||||
|
outcome](initial_metadata, terminal_metadata, code, details) |
||||||
|
|
||||||
|
self._condition.notify_all() |
||||||
|
|
||||||
|
return self._termination |
||||||
|
|
||||||
|
def advance( |
||||||
|
self, initial_metadata=None, payload=None, completion=None, |
||||||
|
allowance=None): |
||||||
|
with self._condition: |
||||||
|
if initial_metadata is not None: |
||||||
|
self._up_initial_metadata = _Awaited( |
||||||
|
_Awaited.Kind.ARRIVED, initial_metadata) |
||||||
|
if payload is not None: |
||||||
|
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
||||||
|
self._up_initial_metadata = _ARRIVED_AND_NONE |
||||||
|
self._up_payload = payload |
||||||
|
self._up_allowance -= 1 |
||||||
|
if completion is not None: |
||||||
|
if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
||||||
|
self._up_initial_metadata = _ARRIVED_AND_NONE |
||||||
|
self._up_completion = _Awaited( |
||||||
|
_Awaited.Kind.ARRIVED, completion) |
||||||
|
if allowance is not None: |
||||||
|
if self._down_payload is not None: |
||||||
|
self._operator.advance(payload=self._down_payload) |
||||||
|
self._down_payload = None |
||||||
|
self._down_allowance += allowance - 1 |
||||||
|
else: |
||||||
|
self._down_allowance += allowance |
||||||
|
self._condition.notify_all() |
||||||
|
|
||||||
|
def cancel(self): |
||||||
|
with self._condition: |
||||||
|
if self._operation_context is not None: |
||||||
|
self._operation_context.cancel() |
||||||
|
self._cancelled = True |
||||||
|
return False |
||||||
|
|
||||||
|
def cancelled(self): |
||||||
|
with self._condition: |
||||||
|
return self._cancelled |
||||||
|
|
||||||
|
def running(self): |
||||||
|
with self._condition: |
||||||
|
return not self._termination.terminated |
||||||
|
|
||||||
|
def done(self): |
||||||
|
with self._condition: |
||||||
|
return self._termination.terminated |
||||||
|
|
||||||
|
def result(self, timeout=None): |
||||||
|
until = None if timeout is None else time.time() + timeout |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._termination.terminated: |
||||||
|
if self._termination.abortion is None: |
||||||
|
return self._up_payload |
||||||
|
elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED: |
||||||
|
raise future.CancelledError() |
||||||
|
else: |
||||||
|
raise self._termination.abortion_error # pylint: disable=raising-bad-type |
||||||
|
else: |
||||||
|
_wait_once_until(self._condition, until) |
||||||
|
|
||||||
|
def exception(self, timeout=None): |
||||||
|
until = None if timeout is None else time.time() + timeout |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._termination.terminated: |
||||||
|
if self._termination.abortion is None: |
||||||
|
return None |
||||||
|
else: |
||||||
|
return self._termination.abortion_error |
||||||
|
else: |
||||||
|
_wait_once_until(self._condition, until) |
||||||
|
|
||||||
|
def traceback(self, timeout=None): |
||||||
|
until = None if timeout is None else time.time() + timeout |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._termination.terminated: |
||||||
|
if self._termination.abortion_error is None: |
||||||
|
return None |
||||||
|
else: |
||||||
|
abortion_error = self._termination.abortion_error |
||||||
|
break |
||||||
|
else: |
||||||
|
_wait_once_until(self._condition, until) |
||||||
|
|
||||||
|
try: |
||||||
|
raise abortion_error |
||||||
|
except face.AbortionError: |
||||||
|
return sys.exc_info()[2] |
||||||
|
|
||||||
|
def add_done_callback(self, fn): |
||||||
|
with self._condition: |
||||||
|
if self._operation_context is not None: |
||||||
|
outcome = self._operation_context.add_termination_callback( |
||||||
|
_done_callback_as_operation_termination_callback(fn, self)) |
||||||
|
if outcome is None: |
||||||
|
return |
||||||
|
else: |
||||||
|
self._set_outcome(outcome) |
||||||
|
|
||||||
|
fn(self) |
||||||
|
|
||||||
|
def consume(self, value): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._termination.terminated: |
||||||
|
return |
||||||
|
elif 0 < self._down_allowance: |
||||||
|
self._operator.advance(payload=value) |
||||||
|
self._down_allowance -= 1 |
||||||
|
return |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def terminate(self): |
||||||
|
with self._condition: |
||||||
|
if self._termination.terminated: |
||||||
|
return |
||||||
|
elif self._down_code.kind is _Transitory.Kind.GONE: |
||||||
|
# Conform to specified idempotence of terminate by ignoring extra calls. |
||||||
|
return |
||||||
|
else: |
||||||
|
completion = self._down_completion() |
||||||
|
self._operator.advance(completion=completion) |
||||||
|
|
||||||
|
def consume_and_terminate(self, value): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._termination.terminated: |
||||||
|
return |
||||||
|
elif 0 < self._down_allowance: |
||||||
|
completion = self._down_completion() |
||||||
|
self._operator.advance(payload=value, completion=completion) |
||||||
|
return |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def __iter__(self): |
||||||
|
return self |
||||||
|
|
||||||
|
def next(self): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._termination.abortion_error is not None: |
||||||
|
raise self._termination.abortion_error |
||||||
|
elif self._up_payload is not None: |
||||||
|
payload = self._up_payload |
||||||
|
self._up_payload = None |
||||||
|
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED: |
||||||
|
self._operator.advance(allowance=1) |
||||||
|
return payload |
||||||
|
elif self._up_completion.kind is _Awaited.Kind.ARRIVED: |
||||||
|
raise StopIteration() |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def is_active(self): |
||||||
|
with self._condition: |
||||||
|
return not self._termination.terminated |
||||||
|
|
||||||
|
def time_remaining(self): |
||||||
|
if self._operation_context is None: |
||||||
|
return 0 |
||||||
|
else: |
||||||
|
return self._operation_context.time_remaining() |
||||||
|
|
||||||
|
def add_abortion_callback(self, abortion_callback): |
||||||
|
with self._condition: |
||||||
|
if self._operation_context is None: |
||||||
|
return self._termination.abortion |
||||||
|
else: |
||||||
|
outcome = self._operation_context.add_termination_callback( |
||||||
|
_abortion_callback_as_operation_termination_callback( |
||||||
|
abortion_callback, self.set_outcome)) |
||||||
|
if outcome is not None: |
||||||
|
return self._set_outcome(outcome).abortion |
||||||
|
else: |
||||||
|
return self._termination.abortion |
||||||
|
|
||||||
|
def initial_metadata(self): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED: |
||||||
|
return self._up_initial_metadata.value |
||||||
|
elif self._termination.terminated: |
||||||
|
return None |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def terminal_metadata(self): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._up_completion.kind is _Awaited.Kind.ARRIVED: |
||||||
|
return self._up_completion.value.terminal_metadata |
||||||
|
elif self._termination.terminated: |
||||||
|
return None |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def code(self): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._up_completion.kind is _Awaited.Kind.ARRIVED: |
||||||
|
return self._up_completion.value.code |
||||||
|
elif self._termination.terminated: |
||||||
|
return None |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def details(self): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
if self._up_completion.kind is _Awaited.Kind.ARRIVED: |
||||||
|
return self._up_completion.value.message |
||||||
|
elif self._termination.terminated: |
||||||
|
return None |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
def set_initial_metadata(self, initial_metadata): |
||||||
|
with self._condition: |
||||||
|
if (self._down_initial_metadata.kind is not |
||||||
|
_Transitory.Kind.NOT_YET_SEEN): |
||||||
|
raise ValueError(_CANNOT_SET_INITIAL_METADATA) |
||||||
|
else: |
||||||
|
self._down_initial_metadata = _GONE |
||||||
|
self._operator.advance(initial_metadata=initial_metadata) |
||||||
|
|
||||||
|
def set_terminal_metadata(self, terminal_metadata): |
||||||
|
with self._condition: |
||||||
|
if (self._down_terminal_metadata.kind is not |
||||||
|
_Transitory.Kind.NOT_YET_SEEN): |
||||||
|
raise ValueError(_CANNOT_SET_TERMINAL_METADATA) |
||||||
|
else: |
||||||
|
self._down_terminal_metadata = _Transitory( |
||||||
|
_Transitory.Kind.PRESENT, terminal_metadata) |
||||||
|
|
||||||
|
def set_code(self, code): |
||||||
|
with self._condition: |
||||||
|
if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN: |
||||||
|
raise ValueError(_CANNOT_SET_CODE) |
||||||
|
else: |
||||||
|
self._down_code = _Transitory(_Transitory.Kind.PRESENT, code) |
||||||
|
|
||||||
|
def set_details(self, details): |
||||||
|
with self._condition: |
||||||
|
if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN: |
||||||
|
raise ValueError(_CANNOT_SET_DETAILS) |
||||||
|
else: |
||||||
|
self._down_details = _Transitory(_Transitory.Kind.PRESENT, details) |
||||||
|
|
||||||
|
def set_outcome(self, outcome): |
||||||
|
with self._condition: |
||||||
|
return self._set_outcome(outcome) |
||||||
|
|
||||||
|
|
||||||
|
def pool_wrap(behavior, operation_context): |
||||||
|
"""Wraps an operation-related behavior so that it may be called in a pool. |
||||||
|
|
||||||
|
Args: |
||||||
|
behavior: A callable related to carrying out an operation. |
||||||
|
operation_context: A base_interfaces.OperationContext for the operation. |
||||||
|
|
||||||
|
Returns: |
||||||
|
A callable that when called carries out the behavior of the given callable |
||||||
|
and handles whatever exceptions it raises appropriately. |
||||||
|
""" |
||||||
|
def translation(*args): |
||||||
|
try: |
||||||
|
behavior(*args) |
||||||
|
except ( |
||||||
|
abandonment.Abandoned, |
||||||
|
face.CancellationError, |
||||||
|
face.ExpirationError, |
||||||
|
face.LocalShutdownError, |
||||||
|
face.RemoteShutdownError, |
||||||
|
face.NetworkError, |
||||||
|
face.RemoteError, |
||||||
|
) as e: |
||||||
|
if operation_context.outcome() is None: |
||||||
|
operation_context.fail(e) |
||||||
|
except Exception as e: |
||||||
|
operation_context.fail(e) |
||||||
|
return callable_util.with_exceptions_logged( |
||||||
|
translation, _INTERNAL_ERROR_LOG_MESSAGE) |
@ -0,0 +1,166 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Behaviors for servicing RPCs.""" |
||||||
|
|
||||||
|
from grpc.framework.crust import _control |
||||||
|
from grpc.framework.foundation import abandonment |
||||||
|
from grpc.framework.interfaces.base import utilities |
||||||
|
from grpc.framework.interfaces.face import face |
||||||
|
|
||||||
|
|
||||||
|
class _ServicerContext(face.ServicerContext): |
||||||
|
|
||||||
|
def __init__(self, rendezvous): |
||||||
|
self._rendezvous = rendezvous |
||||||
|
|
||||||
|
def is_active(self): |
||||||
|
return self._rendezvous.is_active() |
||||||
|
|
||||||
|
def time_remaining(self): |
||||||
|
return self._rendezvous.time_remaining() |
||||||
|
|
||||||
|
def add_abortion_callback(self, abortion_callback): |
||||||
|
return self._rendezvous.add_abortion_callback(abortion_callback) |
||||||
|
|
||||||
|
def cancel(self): |
||||||
|
self._rendezvous.cancel() |
||||||
|
|
||||||
|
def invocation_metadata(self): |
||||||
|
return self._rendezvous.initial_metadata() |
||||||
|
|
||||||
|
def initial_metadata(self, initial_metadata): |
||||||
|
self._rendezvous.set_initial_metadata(initial_metadata) |
||||||
|
|
||||||
|
def terminal_metadata(self, terminal_metadata): |
||||||
|
self._rendezvous.set_terminal_metadata(terminal_metadata) |
||||||
|
|
||||||
|
def code(self, code): |
||||||
|
self._rendezvous.set_code(code) |
||||||
|
|
||||||
|
def details(self, details): |
||||||
|
self._rendezvous.set_details(details) |
||||||
|
|
||||||
|
|
||||||
|
def _adaptation(pool, in_pool): |
||||||
|
def adaptation(operator, operation_context): |
||||||
|
rendezvous = _control.Rendezvous(operator, operation_context) |
||||||
|
outcome = operation_context.add_termination_callback(rendezvous.set_outcome) |
||||||
|
if outcome is None: |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous) |
||||||
|
return utilities.full_subscription(rendezvous) |
||||||
|
else: |
||||||
|
raise abandonment.Abandoned() |
||||||
|
return adaptation |
||||||
|
|
||||||
|
|
||||||
|
def adapt_inline_unary_unary(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
request = next(rendezvous) |
||||||
|
response = method(request, _ServicerContext(rendezvous)) |
||||||
|
rendezvous.consume_and_terminate(response) |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_inline_unary_stream(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
request = next(rendezvous) |
||||||
|
response_iterator = method(request, _ServicerContext(rendezvous)) |
||||||
|
for response in response_iterator: |
||||||
|
rendezvous.consume(response) |
||||||
|
rendezvous.terminate() |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_inline_stream_unary(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
response = method(rendezvous, _ServicerContext(rendezvous)) |
||||||
|
rendezvous.consume_and_terminate(response) |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_inline_stream_stream(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
response_iterator = method(rendezvous, _ServicerContext(rendezvous)) |
||||||
|
for response in response_iterator: |
||||||
|
rendezvous.consume(response) |
||||||
|
rendezvous.terminate() |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_event_unary_unary(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
request = next(rendezvous) |
||||||
|
method( |
||||||
|
request, rendezvous.consume_and_terminate, _ServicerContext(rendezvous)) |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_event_unary_stream(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
request = next(rendezvous) |
||||||
|
method(request, rendezvous, _ServicerContext(rendezvous)) |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_event_stream_unary(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
request_consumer = method( |
||||||
|
rendezvous.consume_and_terminate, _ServicerContext(rendezvous)) |
||||||
|
for request in rendezvous: |
||||||
|
request_consumer.consume(request) |
||||||
|
request_consumer.terminate() |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_event_stream_stream(method, pool): |
||||||
|
def in_pool(rendezvous): |
||||||
|
request_consumer = method(rendezvous, _ServicerContext(rendezvous)) |
||||||
|
for request in rendezvous: |
||||||
|
request_consumer.consume(request) |
||||||
|
request_consumer.terminate() |
||||||
|
return _adaptation(pool, in_pool) |
||||||
|
|
||||||
|
|
||||||
|
def adapt_multi_method(multi_method, pool): |
||||||
|
def adaptation(group, method, operator, operation_context): |
||||||
|
rendezvous = _control.Rendezvous(operator, operation_context) |
||||||
|
outcome = operation_context.add_termination_callback(rendezvous.set_outcome) |
||||||
|
if outcome is None: |
||||||
|
def in_pool(): |
||||||
|
request_consumer = multi_method( |
||||||
|
group, method, rendezvous, _ServicerContext(rendezvous)) |
||||||
|
for request in rendezvous: |
||||||
|
request_consumer.consume(request) |
||||||
|
request_consumer.terminate() |
||||||
|
pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous) |
||||||
|
return utilities.full_subscription(rendezvous) |
||||||
|
else: |
||||||
|
raise abandonment.Abandoned() |
||||||
|
return adaptation |
@ -0,0 +1,352 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Entry points into the Crust layer of RPC Framework.""" |
||||||
|
|
||||||
|
from grpc.framework.common import cardinality |
||||||
|
from grpc.framework.common import style |
||||||
|
from grpc.framework.crust import _calls |
||||||
|
from grpc.framework.crust import _service |
||||||
|
from grpc.framework.interfaces.base import base |
||||||
|
from grpc.framework.interfaces.face import face |
||||||
|
|
||||||
|
|
||||||
|
class _BaseServicer(base.Servicer): |
||||||
|
|
||||||
|
def __init__(self, adapted_methods, adapted_multi_method): |
||||||
|
self._adapted_methods = adapted_methods |
||||||
|
self._adapted_multi_method = adapted_multi_method |
||||||
|
|
||||||
|
def service(self, group, method, context, output_operator): |
||||||
|
adapted_method = self._adapted_methods.get((group, method), None) |
||||||
|
if adapted_method is not None: |
||||||
|
return adapted_method(output_operator, context) |
||||||
|
elif self._adapted_multi_method is not None: |
||||||
|
try: |
||||||
|
return self._adapted_multi_method.service( |
||||||
|
group, method, output_operator, context) |
||||||
|
except face.NoSuchMethodError: |
||||||
|
raise base.NoSuchMethodError() |
||||||
|
else: |
||||||
|
raise base.NoSuchMethodError() |
||||||
|
|
||||||
|
|
||||||
|
class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable): |
||||||
|
|
||||||
|
def __init__(self, end, group, method, pool): |
||||||
|
self._end = end |
||||||
|
self._group = group |
||||||
|
self._method = method |
||||||
|
self._pool = pool |
||||||
|
|
||||||
|
def __call__( |
||||||
|
self, request, timeout, metadata=None, with_call=False): |
||||||
|
return _calls.blocking_unary_unary( |
||||||
|
self._end, self._group, self._method, timeout, with_call, |
||||||
|
metadata, request) |
||||||
|
|
||||||
|
def future(self, request, timeout, metadata=None): |
||||||
|
return _calls.future_unary_unary( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
request) |
||||||
|
|
||||||
|
def event( |
||||||
|
self, request, receiver, abortion_callback, timeout, |
||||||
|
metadata=None): |
||||||
|
return _calls.event_unary_unary( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
request, receiver, abortion_callback, self._pool) |
||||||
|
|
||||||
|
|
||||||
|
class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): |
||||||
|
|
||||||
|
def __init__(self, end, group, method, pool): |
||||||
|
self._end = end |
||||||
|
self._group = group |
||||||
|
self._method = method |
||||||
|
self._pool = pool |
||||||
|
|
||||||
|
def __call__(self, request, timeout, metadata=None): |
||||||
|
return _calls.inline_unary_stream( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
request) |
||||||
|
|
||||||
|
def event( |
||||||
|
self, request, receiver, abortion_callback, timeout, |
||||||
|
metadata=None): |
||||||
|
return _calls.event_unary_stream( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
request, receiver, abortion_callback, self._pool) |
||||||
|
|
||||||
|
|
||||||
|
class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): |
||||||
|
|
||||||
|
def __init__(self, end, group, method, pool): |
||||||
|
self._end = end |
||||||
|
self._group = group |
||||||
|
self._method = method |
||||||
|
self._pool = pool |
||||||
|
|
||||||
|
def __call__( |
||||||
|
self, request_iterator, timeout, metadata=None, |
||||||
|
with_call=False): |
||||||
|
return _calls.blocking_stream_unary( |
||||||
|
self._end, self._group, self._method, timeout, with_call, |
||||||
|
metadata, request_iterator, self._pool) |
||||||
|
|
||||||
|
def future(self, request_iterator, timeout, metadata=None): |
||||||
|
return _calls.future_stream_unary( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
request_iterator, self._pool) |
||||||
|
|
||||||
|
def event( |
||||||
|
self, receiver, abortion_callback, timeout, metadata=None): |
||||||
|
return _calls.event_stream_unary( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
receiver, abortion_callback, self._pool) |
||||||
|
|
||||||
|
|
||||||
|
class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): |
||||||
|
|
||||||
|
def __init__(self, end, group, method, pool): |
||||||
|
self._end = end |
||||||
|
self._group = group |
||||||
|
self._method = method |
||||||
|
self._pool = pool |
||||||
|
|
||||||
|
def __call__(self, request_iterator, timeout, metadata=None): |
||||||
|
return _calls.inline_stream_stream( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
request_iterator, self._pool) |
||||||
|
|
||||||
|
def event( |
||||||
|
self, receiver, abortion_callback, timeout, metadata=None): |
||||||
|
return _calls.event_stream_stream( |
||||||
|
self._end, self._group, self._method, timeout, metadata, |
||||||
|
receiver, abortion_callback, self._pool) |
||||||
|
|
||||||
|
|
||||||
|
class _GenericStub(face.GenericStub): |
||||||
|
"""An face.GenericStub implementation.""" |
||||||
|
|
||||||
|
def __init__(self, end, pool): |
||||||
|
self._end = end |
||||||
|
self._pool = pool |
||||||
|
|
||||||
|
def blocking_unary_unary( |
||||||
|
self, group, method, request, timeout, metadata=None, |
||||||
|
with_call=None): |
||||||
|
return _calls.blocking_unary_unary( |
||||||
|
self._end, group, method, timeout, with_call, metadata, |
||||||
|
request) |
||||||
|
|
||||||
|
def future_unary_unary( |
||||||
|
self, group, method, request, timeout, metadata=None): |
||||||
|
return _calls.future_unary_unary( |
||||||
|
self._end, group, method, timeout, metadata, request) |
||||||
|
|
||||||
|
def inline_unary_stream( |
||||||
|
self, group, method, request, timeout, metadata=None): |
||||||
|
return _calls.inline_unary_stream( |
||||||
|
self._end, group, method, timeout, metadata, request) |
||||||
|
|
||||||
|
def blocking_stream_unary( |
||||||
|
self, group, method, request_iterator, timeout, metadata=None, |
||||||
|
with_call=None): |
||||||
|
return _calls.blocking_stream_unary( |
||||||
|
self._end, group, method, timeout, with_call, metadata, |
||||||
|
request_iterator, self._pool) |
||||||
|
|
||||||
|
def future_stream_unary( |
||||||
|
self, group, method, request_iterator, timeout, metadata=None): |
||||||
|
return _calls.future_stream_unary( |
||||||
|
self._end, group, method, timeout, metadata, |
||||||
|
request_iterator, self._pool) |
||||||
|
|
||||||
|
def inline_stream_stream( |
||||||
|
self, group, method, request_iterator, timeout, metadata=None): |
||||||
|
return _calls.inline_stream_stream( |
||||||
|
self._end, group, method, timeout, metadata, |
||||||
|
request_iterator, self._pool) |
||||||
|
|
||||||
|
def event_unary_unary( |
||||||
|
self, group, method, request, receiver, abortion_callback, timeout, |
||||||
|
metadata=None): |
||||||
|
return _calls.event_unary_unary( |
||||||
|
self._end, group, method, timeout, metadata, request, |
||||||
|
receiver, abortion_callback, self._pool) |
||||||
|
|
||||||
|
def event_unary_stream( |
||||||
|
self, group, method, request, receiver, abortion_callback, timeout, |
||||||
|
metadata=None): |
||||||
|
return _calls.event_unary_stream( |
||||||
|
self._end, group, method, timeout, metadata, request, |
||||||
|
receiver, abortion_callback, self._pool) |
||||||
|
|
||||||
|
def event_stream_unary( |
||||||
|
self, group, method, receiver, abortion_callback, timeout, |
||||||
|
metadata=None): |
||||||
|
return _calls.event_stream_unary( |
||||||
|
self._end, group, method, timeout, metadata, receiver, |
||||||
|
abortion_callback, self._pool) |
||||||
|
|
||||||
|
def event_stream_stream( |
||||||
|
self, group, method, receiver, abortion_callback, timeout, |
||||||
|
metadata=None): |
||||||
|
return _calls.event_stream_stream( |
||||||
|
self._end, group, method, timeout, metadata, receiver, |
||||||
|
abortion_callback, self._pool) |
||||||
|
|
||||||
|
def unary_unary(self, group, method): |
||||||
|
return _UnaryUnaryMultiCallable(self._end, group, method, self._pool) |
||||||
|
|
||||||
|
def unary_stream(self, group, method): |
||||||
|
return _UnaryStreamMultiCallable(self._end, group, method, self._pool) |
||||||
|
|
||||||
|
def stream_unary(self, group, method): |
||||||
|
return _StreamUnaryMultiCallable(self._end, group, method, self._pool) |
||||||
|
|
||||||
|
def stream_stream(self, group, method): |
||||||
|
return _StreamStreamMultiCallable(self._end, group, method, self._pool) |
||||||
|
|
||||||
|
|
||||||
|
class _DynamicStub(face.DynamicStub): |
||||||
|
"""An face.DynamicStub implementation.""" |
||||||
|
|
||||||
|
def __init__(self, end, group, cardinalities, pool): |
||||||
|
self._end = end |
||||||
|
self._group = group |
||||||
|
self._cardinalities = cardinalities |
||||||
|
self._pool = pool |
||||||
|
|
||||||
|
def __getattr__(self, attr): |
||||||
|
method_cardinality = self._cardinalities.get(attr) |
||||||
|
if method_cardinality is cardinality.Cardinality.UNARY_UNARY: |
||||||
|
return _UnaryUnaryMultiCallable(self._end, self._group, attr, self._pool) |
||||||
|
elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: |
||||||
|
return _UnaryStreamMultiCallable(self._end, self._group, attr, self._pool) |
||||||
|
elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: |
||||||
|
return _StreamUnaryMultiCallable(self._end, self._group, attr, self._pool) |
||||||
|
elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: |
||||||
|
return _StreamStreamMultiCallable( |
||||||
|
self._end, self._group, attr, self._pool) |
||||||
|
else: |
||||||
|
raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr) |
||||||
|
|
||||||
|
|
||||||
|
def _adapt_method_implementations(method_implementations, pool): |
||||||
|
adapted_implementations = {} |
||||||
|
for name, method_implementation in method_implementations.iteritems(): |
||||||
|
if method_implementation.style is style.Service.INLINE: |
||||||
|
if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: |
||||||
|
adapted_implementations[name] = _service.adapt_inline_unary_unary( |
||||||
|
method_implementation.unary_unary_inline, pool) |
||||||
|
elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: |
||||||
|
adapted_implementations[name] = _service.adapt_inline_unary_stream( |
||||||
|
method_implementation.unary_stream_inline, pool) |
||||||
|
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: |
||||||
|
adapted_implementations[name] = _service.adapt_inline_stream_unary( |
||||||
|
method_implementation.stream_unary_inline, pool) |
||||||
|
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: |
||||||
|
adapted_implementations[name] = _service.adapt_inline_stream_stream( |
||||||
|
method_implementation.stream_stream_inline, pool) |
||||||
|
elif method_implementation.style is style.Service.EVENT: |
||||||
|
if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: |
||||||
|
adapted_implementations[name] = _service.adapt_event_unary_unary( |
||||||
|
method_implementation.unary_unary_event, pool) |
||||||
|
elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: |
||||||
|
adapted_implementations[name] = _service.adapt_event_unary_stream( |
||||||
|
method_implementation.unary_stream_event, pool) |
||||||
|
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: |
||||||
|
adapted_implementations[name] = _service.adapt_event_stream_unary( |
||||||
|
method_implementation.stream_unary_event, pool) |
||||||
|
elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: |
||||||
|
adapted_implementations[name] = _service.adapt_event_stream_stream( |
||||||
|
method_implementation.stream_stream_event, pool) |
||||||
|
return adapted_implementations |
||||||
|
|
||||||
|
|
||||||
|
def servicer(method_implementations, multi_method_implementation, pool): |
||||||
|
"""Creates a base.Servicer. |
||||||
|
|
||||||
|
It is guaranteed that any passed face.MultiMethodImplementation will |
||||||
|
only be called to service an RPC if there is no |
||||||
|
face.MethodImplementation for the RPC method in the passed |
||||||
|
method_implementations dictionary. |
||||||
|
|
||||||
|
Args: |
||||||
|
method_implementations: A dictionary from RPC method name to |
||||||
|
face.MethodImplementation object to be used to service the named |
||||||
|
RPC method. |
||||||
|
multi_method_implementation: An face.MultiMethodImplementation to be |
||||||
|
used to service any RPCs not serviced by the |
||||||
|
face.MethodImplementations given in the method_implementations |
||||||
|
dictionary, or None. |
||||||
|
pool: A thread pool. |
||||||
|
|
||||||
|
Returns: |
||||||
|
A base.Servicer that services RPCs via the given implementations. |
||||||
|
""" |
||||||
|
adapted_implementations = _adapt_method_implementations( |
||||||
|
method_implementations, pool) |
||||||
|
adapted_multi_method_implementation = _service.adapt_multi_method( |
||||||
|
multi_method_implementation, pool) |
||||||
|
return _BaseServicer( |
||||||
|
adapted_implementations, adapted_multi_method_implementation) |
||||||
|
|
||||||
|
|
||||||
|
def generic_stub(end, pool): |
||||||
|
"""Creates an face.GenericStub. |
||||||
|
|
||||||
|
Args: |
||||||
|
end: A base.End. |
||||||
|
pool: A futures.ThreadPoolExecutor. |
||||||
|
|
||||||
|
Returns: |
||||||
|
A face.GenericStub that performs RPCs via the given base.End. |
||||||
|
""" |
||||||
|
return _GenericStub(end, pool) |
||||||
|
|
||||||
|
|
||||||
|
def dynamic_stub(end, group, cardinalities, pool): |
||||||
|
"""Creates an face.DynamicStub. |
||||||
|
|
||||||
|
Args: |
||||||
|
end: A base.End. |
||||||
|
group: The group identifier for all RPCs to be made with the created |
||||||
|
face.DynamicStub. |
||||||
|
cardinalities: A dict from method identifier to cardinality.Cardinality |
||||||
|
value identifying the cardinality of every RPC method to be supported by |
||||||
|
the created face.DynamicStub. |
||||||
|
pool: A futures.ThreadPoolExecutor. |
||||||
|
|
||||||
|
Returns: |
||||||
|
A face.DynamicStub that performs RPCs via the given base.End. |
||||||
|
""" |
||||||
|
return _DynamicStub(end, group, cardinalities, pool) |
@ -0,0 +1,160 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Tests Face compliance of the crust-over-core-over-gRPC-links stack.""" |
||||||
|
|
||||||
|
import collections |
||||||
|
import unittest |
||||||
|
|
||||||
|
from grpc._adapter import _intermediary_low |
||||||
|
from grpc._links import invocation |
||||||
|
from grpc._links import service |
||||||
|
from grpc.framework.core import implementations as core_implementations |
||||||
|
from grpc.framework.crust import implementations as crust_implementations |
||||||
|
from grpc.framework.foundation import logging_pool |
||||||
|
from grpc.framework.interfaces.links import utilities |
||||||
|
from grpc_test import test_common |
||||||
|
from grpc_test.framework.common import test_constants |
||||||
|
from grpc_test.framework.interfaces.face import test_cases |
||||||
|
from grpc_test.framework.interfaces.face import test_interfaces |
||||||
|
from grpc_test.framework.interfaces.links import test_utilities |
||||||
|
|
||||||
|
|
||||||
|
class _SerializationBehaviors( |
||||||
|
collections.namedtuple( |
||||||
|
'_SerializationBehaviors', |
||||||
|
('request_serializers', 'request_deserializers', 'response_serializers', |
||||||
|
'response_deserializers',))): |
||||||
|
pass |
||||||
|
|
||||||
|
|
||||||
|
def _serialization_behaviors_from_test_methods(test_methods): |
||||||
|
request_serializers = {} |
||||||
|
request_deserializers = {} |
||||||
|
response_serializers = {} |
||||||
|
response_deserializers = {} |
||||||
|
for (group, method), test_method in test_methods.iteritems(): |
||||||
|
request_serializers[group, method] = test_method.serialize_request |
||||||
|
request_deserializers[group, method] = test_method.deserialize_request |
||||||
|
response_serializers[group, method] = test_method.serialize_response |
||||||
|
response_deserializers[group, method] = test_method.deserialize_response |
||||||
|
return _SerializationBehaviors( |
||||||
|
request_serializers, request_deserializers, response_serializers, |
||||||
|
response_deserializers) |
||||||
|
|
||||||
|
|
||||||
|
class _Implementation(test_interfaces.Implementation): |
||||||
|
|
||||||
|
def instantiate( |
||||||
|
self, methods, method_implementations, multi_method_implementation): |
||||||
|
pool = logging_pool.pool(test_constants.POOL_SIZE) |
||||||
|
servicer = crust_implementations.servicer( |
||||||
|
method_implementations, multi_method_implementation, pool) |
||||||
|
serialization_behaviors = _serialization_behaviors_from_test_methods( |
||||||
|
methods) |
||||||
|
invocation_end_link = core_implementations.invocation_end_link() |
||||||
|
service_end_link = core_implementations.service_end_link( |
||||||
|
servicer, test_constants.DEFAULT_TIMEOUT, |
||||||
|
test_constants.MAXIMUM_TIMEOUT) |
||||||
|
service_grpc_link = service.service_link( |
||||||
|
serialization_behaviors.request_deserializers, |
||||||
|
serialization_behaviors.response_serializers) |
||||||
|
port = service_grpc_link.add_port(0, None) |
||||||
|
channel = _intermediary_low.Channel('localhost:%d' % port, None) |
||||||
|
invocation_grpc_link = invocation.invocation_link( |
||||||
|
channel, b'localhost', |
||||||
|
serialization_behaviors.request_serializers, |
||||||
|
serialization_behaviors.response_deserializers) |
||||||
|
|
||||||
|
invocation_end_link.join_link(invocation_grpc_link) |
||||||
|
invocation_grpc_link.join_link(invocation_end_link) |
||||||
|
service_grpc_link.join_link(service_end_link) |
||||||
|
service_end_link.join_link(service_grpc_link) |
||||||
|
service_end_link.start() |
||||||
|
invocation_end_link.start() |
||||||
|
invocation_grpc_link.start() |
||||||
|
service_grpc_link.start() |
||||||
|
|
||||||
|
generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) |
||||||
|
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. |
||||||
|
group = next(iter(methods))[0] |
||||||
|
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to |
||||||
|
# _digest.TestServiceDigest. |
||||||
|
cardinalities = { |
||||||
|
method: method_object.cardinality() |
||||||
|
for (group, method), method_object in methods.iteritems()} |
||||||
|
dynamic_stub = crust_implementations.dynamic_stub( |
||||||
|
invocation_end_link, group, cardinalities, pool) |
||||||
|
|
||||||
|
return generic_stub, {group: dynamic_stub}, ( |
||||||
|
invocation_end_link, invocation_grpc_link, service_grpc_link, |
||||||
|
service_end_link, pool) |
||||||
|
|
||||||
|
def destantiate(self, memo): |
||||||
|
(invocation_end_link, invocation_grpc_link, service_grpc_link, |
||||||
|
service_end_link, pool) = memo |
||||||
|
invocation_end_link.stop(0).wait() |
||||||
|
invocation_grpc_link.stop() |
||||||
|
service_grpc_link.stop_gracefully() |
||||||
|
service_end_link.stop(0).wait() |
||||||
|
invocation_end_link.join_link(utilities.NULL_LINK) |
||||||
|
invocation_grpc_link.join_link(utilities.NULL_LINK) |
||||||
|
service_grpc_link.join_link(utilities.NULL_LINK) |
||||||
|
service_end_link.join_link(utilities.NULL_LINK) |
||||||
|
pool.shutdown(wait=True) |
||||||
|
|
||||||
|
def invocation_metadata(self): |
||||||
|
return test_common.INVOCATION_INITIAL_METADATA |
||||||
|
|
||||||
|
def initial_metadata(self): |
||||||
|
return test_common.SERVICE_INITIAL_METADATA |
||||||
|
|
||||||
|
def terminal_metadata(self): |
||||||
|
return test_common.SERVICE_TERMINAL_METADATA |
||||||
|
|
||||||
|
def code(self): |
||||||
|
return _intermediary_low.Code.OK |
||||||
|
|
||||||
|
def details(self): |
||||||
|
return test_common.DETAILS |
||||||
|
|
||||||
|
def metadata_transmitted(self, original_metadata, transmitted_metadata): |
||||||
|
return original_metadata is None or grpc_test_common.metadata_transmitted( |
||||||
|
original_metadata, transmitted_metadata) |
||||||
|
|
||||||
|
|
||||||
|
def load_tests(loader, tests, pattern): |
||||||
|
return unittest.TestSuite( |
||||||
|
tests=tuple( |
||||||
|
loader.loadTestsFromTestCase(test_case_class) |
||||||
|
for test_case_class in test_cases.test_cases(_Implementation()))) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
unittest.main(verbosity=2) |
@ -0,0 +1,30 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
|
@ -0,0 +1,180 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Tests of grpc.beta._connectivity_channel.""" |
||||||
|
|
||||||
|
import threading |
||||||
|
import time |
||||||
|
import unittest |
||||||
|
|
||||||
|
from grpc._adapter import _low |
||||||
|
from grpc._adapter import _types |
||||||
|
from grpc.beta import _connectivity_channel |
||||||
|
from grpc_test.framework.common import test_constants |
||||||
|
|
||||||
|
_MAPPING_FUNCTION = lambda integer: integer * 200 + 17 |
||||||
|
_MAPPING = { |
||||||
|
state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState} |
||||||
|
_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map( |
||||||
|
_MAPPING_FUNCTION, _types.ConnectivityState) |
||||||
|
|
||||||
|
|
||||||
|
def _drive_completion_queue(completion_queue): |
||||||
|
while True: |
||||||
|
event = completion_queue.next(time.time() + 24 * 60 * 60) |
||||||
|
if event.type == _types.EventType.QUEUE_SHUTDOWN: |
||||||
|
break |
||||||
|
|
||||||
|
|
||||||
|
class _Callback(object): |
||||||
|
|
||||||
|
def __init__(self): |
||||||
|
self._condition = threading.Condition() |
||||||
|
self._connectivities = [] |
||||||
|
|
||||||
|
def update(self, connectivity): |
||||||
|
with self._condition: |
||||||
|
self._connectivities.append(connectivity) |
||||||
|
self._condition.notify() |
||||||
|
|
||||||
|
def connectivities(self): |
||||||
|
with self._condition: |
||||||
|
return tuple(self._connectivities) |
||||||
|
|
||||||
|
def block_until_connectivities_satisfy(self, predicate): |
||||||
|
with self._condition: |
||||||
|
while True: |
||||||
|
connectivities = tuple(self._connectivities) |
||||||
|
if predicate(connectivities): |
||||||
|
return connectivities |
||||||
|
else: |
||||||
|
self._condition.wait() |
||||||
|
|
||||||
|
|
||||||
|
class ChannelConnectivityTest(unittest.TestCase): |
||||||
|
|
||||||
|
def test_lonely_channel_connectivity(self): |
||||||
|
low_channel = _low.Channel('localhost:12345', ()) |
||||||
|
callback = _Callback() |
||||||
|
|
||||||
|
connectivity_channel = _connectivity_channel.ConnectivityChannel( |
||||||
|
low_channel, _MAPPING) |
||||||
|
connectivity_channel.subscribe(callback.update, try_to_connect=False) |
||||||
|
first_connectivities = callback.block_until_connectivities_satisfy(bool) |
||||||
|
connectivity_channel.subscribe(callback.update, try_to_connect=True) |
||||||
|
second_connectivities = callback.block_until_connectivities_satisfy( |
||||||
|
lambda connectivities: 2 <= len(connectivities)) |
||||||
|
# Wait for a connection that will never happen. |
||||||
|
time.sleep(test_constants.SHORT_TIMEOUT) |
||||||
|
third_connectivities = callback.connectivities() |
||||||
|
connectivity_channel.unsubscribe(callback.update) |
||||||
|
fourth_connectivities = callback.connectivities() |
||||||
|
connectivity_channel.unsubscribe(callback.update) |
||||||
|
fifth_connectivities = callback.connectivities() |
||||||
|
|
||||||
|
self.assertSequenceEqual((_IDLE,), first_connectivities) |
||||||
|
self.assertNotIn(_READY, second_connectivities) |
||||||
|
self.assertNotIn(_READY, third_connectivities) |
||||||
|
self.assertNotIn(_READY, fourth_connectivities) |
||||||
|
self.assertNotIn(_READY, fifth_connectivities) |
||||||
|
|
||||||
|
def test_immediately_connectable_channel_connectivity(self): |
||||||
|
server_completion_queue = _low.CompletionQueue() |
||||||
|
server = _low.Server(server_completion_queue, []) |
||||||
|
port = server.add_http2_port('[::]:0') |
||||||
|
server.start() |
||||||
|
server_completion_queue_thread = threading.Thread( |
||||||
|
target=_drive_completion_queue, args=(server_completion_queue,)) |
||||||
|
server_completion_queue_thread.start() |
||||||
|
low_channel = _low.Channel('localhost:%d' % port, ()) |
||||||
|
first_callback = _Callback() |
||||||
|
second_callback = _Callback() |
||||||
|
|
||||||
|
connectivity_channel = _connectivity_channel.ConnectivityChannel( |
||||||
|
low_channel, _MAPPING) |
||||||
|
connectivity_channel.subscribe(first_callback.update, try_to_connect=False) |
||||||
|
first_connectivities = first_callback.block_until_connectivities_satisfy( |
||||||
|
bool) |
||||||
|
# Wait for a connection that will never happen because try_to_connect=True |
||||||
|
# has not yet been passed. |
||||||
|
time.sleep(test_constants.SHORT_TIMEOUT) |
||||||
|
second_connectivities = first_callback.connectivities() |
||||||
|
connectivity_channel.subscribe(second_callback.update, try_to_connect=True) |
||||||
|
third_connectivities = first_callback.block_until_connectivities_satisfy( |
||||||
|
lambda connectivities: 2 <= len(connectivities)) |
||||||
|
fourth_connectivities = second_callback.block_until_connectivities_satisfy( |
||||||
|
bool) |
||||||
|
# Wait for a connection that will happen (or may already have happened). |
||||||
|
first_callback.block_until_connectivities_satisfy( |
||||||
|
lambda connectivities: _READY in connectivities) |
||||||
|
second_callback.block_until_connectivities_satisfy( |
||||||
|
lambda connectivities: _READY in connectivities) |
||||||
|
connectivity_channel.unsubscribe(first_callback.update) |
||||||
|
connectivity_channel.unsubscribe(second_callback.update) |
||||||
|
|
||||||
|
server.shutdown() |
||||||
|
server_completion_queue.shutdown() |
||||||
|
server_completion_queue_thread.join() |
||||||
|
|
||||||
|
self.assertSequenceEqual((_IDLE,), first_connectivities) |
||||||
|
self.assertSequenceEqual((_IDLE,), second_connectivities) |
||||||
|
self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities) |
||||||
|
self.assertNotIn(_FATAL_FAILURE, third_connectivities) |
||||||
|
self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities) |
||||||
|
self.assertNotIn(_FATAL_FAILURE, fourth_connectivities) |
||||||
|
|
||||||
|
def test_reachable_then_unreachable_channel_connectivity(self): |
||||||
|
server_completion_queue = _low.CompletionQueue() |
||||||
|
server = _low.Server(server_completion_queue, []) |
||||||
|
port = server.add_http2_port('[::]:0') |
||||||
|
server.start() |
||||||
|
server_completion_queue_thread = threading.Thread( |
||||||
|
target=_drive_completion_queue, args=(server_completion_queue,)) |
||||||
|
server_completion_queue_thread.start() |
||||||
|
low_channel = _low.Channel('localhost:%d' % port, ()) |
||||||
|
callback = _Callback() |
||||||
|
|
||||||
|
connectivity_channel = _connectivity_channel.ConnectivityChannel( |
||||||
|
low_channel, _MAPPING) |
||||||
|
connectivity_channel.subscribe(callback.update, try_to_connect=True) |
||||||
|
callback.block_until_connectivities_satisfy( |
||||||
|
lambda connectivities: _READY in connectivities) |
||||||
|
# Now take down the server and confirm that channel readiness is repudiated. |
||||||
|
server.shutdown() |
||||||
|
callback.block_until_connectivities_satisfy( |
||||||
|
lambda connectivities: connectivities[-1] is not _READY) |
||||||
|
connectivity_channel.unsubscribe(callback.update) |
||||||
|
|
||||||
|
server.shutdown() |
||||||
|
server_completion_queue.shutdown() |
||||||
|
server_completion_queue_thread.join() |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
unittest.main(verbosity=2) |
@ -0,0 +1,123 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Tests of grpc.beta.utilities.""" |
||||||
|
|
||||||
|
import threading |
||||||
|
import time |
||||||
|
import unittest |
||||||
|
|
||||||
|
from grpc._adapter import _low |
||||||
|
from grpc._adapter import _types |
||||||
|
from grpc.beta import beta |
||||||
|
from grpc.beta import utilities |
||||||
|
from grpc.framework.foundation import future |
||||||
|
from grpc_test.framework.common import test_constants |
||||||
|
|
||||||
|
|
||||||
|
def _drive_completion_queue(completion_queue): |
||||||
|
while True: |
||||||
|
event = completion_queue.next(time.time() + 24 * 60 * 60) |
||||||
|
if event.type == _types.EventType.QUEUE_SHUTDOWN: |
||||||
|
break |
||||||
|
|
||||||
|
|
||||||
|
class _Callback(object): |
||||||
|
|
||||||
|
def __init__(self): |
||||||
|
self._condition = threading.Condition() |
||||||
|
self._value = None |
||||||
|
|
||||||
|
def accept_value(self, value): |
||||||
|
with self._condition: |
||||||
|
self._value = value |
||||||
|
self._condition.notify_all() |
||||||
|
|
||||||
|
def block_until_called(self): |
||||||
|
with self._condition: |
||||||
|
while self._value is None: |
||||||
|
self._condition.wait() |
||||||
|
return self._value |
||||||
|
|
||||||
|
|
||||||
|
class ChannelConnectivityTest(unittest.TestCase): |
||||||
|
|
||||||
|
def test_lonely_channel_connectivity(self): |
||||||
|
channel = beta.create_insecure_channel('localhost', 12345) |
||||||
|
callback = _Callback() |
||||||
|
|
||||||
|
ready_future = utilities.channel_ready_future(channel) |
||||||
|
ready_future.add_done_callback(callback.accept_value) |
||||||
|
with self.assertRaises(future.TimeoutError): |
||||||
|
ready_future.result(test_constants.SHORT_TIMEOUT) |
||||||
|
self.assertFalse(ready_future.cancelled()) |
||||||
|
self.assertFalse(ready_future.done()) |
||||||
|
self.assertTrue(ready_future.running()) |
||||||
|
ready_future.cancel() |
||||||
|
value_passed_to_callback = callback.block_until_called() |
||||||
|
self.assertIs(ready_future, value_passed_to_callback) |
||||||
|
self.assertTrue(ready_future.cancelled()) |
||||||
|
self.assertTrue(ready_future.done()) |
||||||
|
self.assertFalse(ready_future.running()) |
||||||
|
|
||||||
|
def test_immediately_connectable_channel_connectivity(self): |
||||||
|
server_completion_queue = _low.CompletionQueue() |
||||||
|
server = _low.Server(server_completion_queue, []) |
||||||
|
port = server.add_http2_port('[::]:0') |
||||||
|
server.start() |
||||||
|
server_completion_queue_thread = threading.Thread( |
||||||
|
target=_drive_completion_queue, args=(server_completion_queue,)) |
||||||
|
server_completion_queue_thread.start() |
||||||
|
channel = beta.create_insecure_channel('localhost', port) |
||||||
|
callback = _Callback() |
||||||
|
|
||||||
|
try: |
||||||
|
ready_future = utilities.channel_ready_future(channel) |
||||||
|
ready_future.add_done_callback(callback.accept_value) |
||||||
|
self.assertIsNone( |
||||||
|
ready_future.result(test_constants.SHORT_TIMEOUT)) |
||||||
|
value_passed_to_callback = callback.block_until_called() |
||||||
|
self.assertIs(ready_future, value_passed_to_callback) |
||||||
|
self.assertFalse(ready_future.cancelled()) |
||||||
|
self.assertTrue(ready_future.done()) |
||||||
|
self.assertFalse(ready_future.running()) |
||||||
|
# Cancellation after maturity has no effect. |
||||||
|
ready_future.cancel() |
||||||
|
self.assertFalse(ready_future.cancelled()) |
||||||
|
self.assertTrue(ready_future.done()) |
||||||
|
self.assertFalse(ready_future.running()) |
||||||
|
finally: |
||||||
|
ready_future.cancel() |
||||||
|
server.shutdown() |
||||||
|
server_completion_queue.shutdown() |
||||||
|
server_completion_queue_thread.join() |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
unittest.main(verbosity=2) |
@ -0,0 +1,111 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""Tests Face interface compliance of the crust-over-core stack.""" |
||||||
|
|
||||||
|
import collections |
||||||
|
import unittest |
||||||
|
|
||||||
|
from grpc.framework.core import implementations as core_implementations |
||||||
|
from grpc.framework.crust import implementations as crust_implementations |
||||||
|
from grpc.framework.foundation import logging_pool |
||||||
|
from grpc.framework.interfaces.links import utilities |
||||||
|
from grpc_test.framework.common import test_constants |
||||||
|
from grpc_test.framework.interfaces.face import test_cases |
||||||
|
from grpc_test.framework.interfaces.face import test_interfaces |
||||||
|
from grpc_test.framework.interfaces.links import test_utilities |
||||||
|
|
||||||
|
|
||||||
|
class _Implementation(test_interfaces.Implementation): |
||||||
|
|
||||||
|
def instantiate( |
||||||
|
self, methods, method_implementations, multi_method_implementation): |
||||||
|
pool = logging_pool.pool(test_constants.POOL_SIZE) |
||||||
|
servicer = crust_implementations.servicer( |
||||||
|
method_implementations, multi_method_implementation, pool) |
||||||
|
|
||||||
|
service_end_link = core_implementations.service_end_link( |
||||||
|
servicer, test_constants.DEFAULT_TIMEOUT, |
||||||
|
test_constants.MAXIMUM_TIMEOUT) |
||||||
|
invocation_end_link = core_implementations.invocation_end_link() |
||||||
|
invocation_end_link.join_link(service_end_link) |
||||||
|
service_end_link.join_link(invocation_end_link) |
||||||
|
service_end_link.start() |
||||||
|
invocation_end_link.start() |
||||||
|
|
||||||
|
generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) |
||||||
|
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. |
||||||
|
group = next(iter(methods))[0] |
||||||
|
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to |
||||||
|
# _digest.TestServiceDigest. |
||||||
|
cardinalities = { |
||||||
|
method: method_object.cardinality() |
||||||
|
for (group, method), method_object in methods.iteritems()} |
||||||
|
dynamic_stub = crust_implementations.dynamic_stub( |
||||||
|
invocation_end_link, group, cardinalities, pool) |
||||||
|
|
||||||
|
return generic_stub, {group: dynamic_stub}, ( |
||||||
|
invocation_end_link, service_end_link, pool) |
||||||
|
|
||||||
|
def destantiate(self, memo): |
||||||
|
invocation_end_link, service_end_link, pool = memo |
||||||
|
invocation_end_link.stop(0).wait() |
||||||
|
service_end_link.stop(0).wait() |
||||||
|
invocation_end_link.join_link(utilities.NULL_LINK) |
||||||
|
service_end_link.join_link(utilities.NULL_LINK) |
||||||
|
pool.shutdown(wait=True) |
||||||
|
|
||||||
|
def invocation_metadata(self): |
||||||
|
return object() |
||||||
|
|
||||||
|
def initial_metadata(self): |
||||||
|
return object() |
||||||
|
|
||||||
|
def terminal_metadata(self): |
||||||
|
return object() |
||||||
|
|
||||||
|
def code(self): |
||||||
|
return object() |
||||||
|
|
||||||
|
def details(self): |
||||||
|
return object() |
||||||
|
|
||||||
|
def metadata_transmitted(self, original_metadata, transmitted_metadata): |
||||||
|
return original_metadata is transmitted_metadata |
||||||
|
|
||||||
|
|
||||||
|
def load_tests(loader, tests, pattern): |
||||||
|
return unittest.TestSuite( |
||||||
|
tests=tuple( |
||||||
|
loader.loadTestsFromTestCase(test_case_class) |
||||||
|
for test_case_class in test_cases.test_cases(_Implementation()))) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
unittest.main(verbosity=2) |
@ -0,0 +1,37 @@ |
|||||||
|
# Copyright 2015, Google Inc. |
||||||
|
# All rights reserved. |
||||||
|
# |
||||||
|
# Redistribution and use in source and binary forms, with or without |
||||||
|
# modification, are permitted provided that the following conditions are |
||||||
|
# met: |
||||||
|
# |
||||||
|
# * Redistributions of source code must retain the above copyright |
||||||
|
# notice, this list of conditions and the following disclaimer. |
||||||
|
# * Redistributions in binary form must reproduce the above |
||||||
|
# copyright notice, this list of conditions and the following disclaimer |
||||||
|
# in the documentation and/or other materials provided with the |
||||||
|
# distribution. |
||||||
|
# * Neither the name of Google Inc. nor the names of its |
||||||
|
# contributors may be used to endorse or promote products derived from |
||||||
|
# this software without specific prior written permission. |
||||||
|
# |
||||||
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||||
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||||
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||||
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||||
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||||
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||||
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||||
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||||
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||||
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||||
|
|
||||||
|
"""A test constant working around issue 3069.""" |
||||||
|
|
||||||
|
# test_constants is referenced from specification in this module. |
||||||
|
from grpc_test.framework.common import test_constants # pylint: disable=unused-import |
||||||
|
|
||||||
|
# TODO(issue 3069): Replace uses of this constant with |
||||||
|
# test_constants.SHORT_TIMEOUT. |
||||||
|
REALLY_SHORT_TIMEOUT = 0.1 |
Loading…
Reference in new issue