mirror of https://github.com/grpc/grpc.git
commit
391a6c7786
50 changed files with 1284 additions and 609 deletions
@ -0,0 +1,176 @@ |
||||
# Copyright 2015, Google Inc. |
||||
# All rights reserved. |
||||
# |
||||
# Redistribution and use in source and binary forms, with or without |
||||
# modification, are permitted provided that the following conditions are |
||||
# met: |
||||
# |
||||
# * Redistributions of source code must retain the above copyright |
||||
# notice, this list of conditions and the following disclaimer. |
||||
# * Redistributions in binary form must reproduce the above |
||||
# copyright notice, this list of conditions and the following disclaimer |
||||
# in the documentation and/or other materials provided with the |
||||
# distribution. |
||||
# * Neither the name of Google Inc. nor the names of its |
||||
# contributors may be used to endorse or promote products derived from |
||||
# this software without specific prior written permission. |
||||
# |
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
"""State and behavior for passing protocol objects in an operation.""" |
||||
|
||||
import collections |
||||
import enum |
||||
|
||||
from grpc.framework.core import _constants |
||||
from grpc.framework.core import _interfaces |
||||
from grpc.framework.core import _utilities |
||||
from grpc.framework.foundation import callable_util |
||||
from grpc.framework.interfaces.base import base |
||||
|
||||
_EXCEPTION_LOG_MESSAGE = 'Exception delivering protocol object!' |
||||
|
||||
_LOCAL_FAILURE_OUTCOME = _utilities.Outcome( |
||||
base.Outcome.Kind.LOCAL_FAILURE, None, None) |
||||
|
||||
|
||||
class _Awaited( |
||||
collections.namedtuple('_Awaited', ('kind', 'value',))): |
||||
|
||||
@enum.unique |
||||
class Kind(enum.Enum): |
||||
NOT_YET_ARRIVED = 'not yet arrived' |
||||
ARRIVED = 'arrived' |
||||
|
||||
_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None) |
||||
_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None) |
||||
|
||||
|
||||
class _Transitory( |
||||
collections.namedtuple('_Transitory', ('kind', 'value',))): |
||||
|
||||
@enum.unique |
||||
class Kind(enum.Enum): |
||||
NOT_YET_SEEN = 'not yet seen' |
||||
PRESENT = 'present' |
||||
GONE = 'gone' |
||||
|
||||
_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None) |
||||
_GONE = _Transitory(_Transitory.Kind.GONE, None) |
||||
|
||||
|
||||
class _ProtocolManager(_interfaces.ProtocolManager): |
||||
"""An implementation of _interfaces.ExpirationManager.""" |
||||
|
||||
def __init__( |
||||
self, protocol_receiver, lock, pool, termination_manager, |
||||
transmission_manager, expiration_manager): |
||||
"""Constructor. |
||||
|
||||
Args: |
||||
protocol_receiver: An _Awaited wrapping of the base.ProtocolReceiver to |
||||
which protocol objects should be passed during the operation. May be |
||||
of kind _Awaited.Kind.NOT_YET_ARRIVED if the customer's subscription is |
||||
not yet known and may be of kind _Awaited.Kind.ARRIVED but with a value |
||||
of None if the customer's subscription did not include a |
||||
ProtocolReceiver. |
||||
lock: The operation-wide lock. |
||||
pool: A thread pool. |
||||
termination_manager: The _interfaces.TerminationManager for the operation. |
||||
transmission_manager: The _interfaces.TransmissionManager for the |
||||
operation. |
||||
expiration_manager: The _interfaces.ExpirationManager for the operation. |
||||
""" |
||||
self._lock = lock |
||||
self._pool = pool |
||||
self._termination_manager = termination_manager |
||||
self._transmission_manager = transmission_manager |
||||
self._expiration_manager = expiration_manager |
||||
|
||||
self._protocol_receiver = protocol_receiver |
||||
self._context = _NOT_YET_SEEN |
||||
|
||||
def _abort_and_notify(self, outcome): |
||||
if self._termination_manager.outcome is None: |
||||
self._termination_manager.abort(outcome) |
||||
self._transmission_manager.abort(outcome) |
||||
self._expiration_manager.terminate() |
||||
|
||||
def _deliver(self, behavior, value): |
||||
def deliver(): |
||||
delivery_outcome = callable_util.call_logging_exceptions( |
||||
behavior, _EXCEPTION_LOG_MESSAGE, value) |
||||
if delivery_outcome.kind is callable_util.Outcome.Kind.RAISED: |
||||
with self._lock: |
||||
self._abort_and_notify(_LOCAL_FAILURE_OUTCOME) |
||||
self._pool.submit( |
||||
callable_util.with_exceptions_logged( |
||||
deliver, _constants.INTERNAL_ERROR_LOG_MESSAGE)) |
||||
|
||||
def set_protocol_receiver(self, protocol_receiver): |
||||
"""See _interfaces.ProtocolManager.set_protocol_receiver for spec.""" |
||||
self._protocol_receiver = _Awaited(_Awaited.Kind.ARRIVED, protocol_receiver) |
||||
if (self._context.kind is _Transitory.Kind.PRESENT and |
||||
protocol_receiver is not None): |
||||
self._deliver(protocol_receiver.context, self._context.value) |
||||
self._context = _GONE |
||||
|
||||
def accept_protocol_context(self, protocol_context): |
||||
"""See _interfaces.ProtocolManager.accept_protocol_context for spec.""" |
||||
if self._protocol_receiver.kind is _Awaited.Kind.ARRIVED: |
||||
if self._protocol_receiver.value is not None: |
||||
self._deliver(self._protocol_receiver.value.context, protocol_context) |
||||
self._context = _GONE |
||||
else: |
||||
self._context = _Transitory(_Transitory.Kind.PRESENT, protocol_context) |
||||
|
||||
|
||||
def invocation_protocol_manager( |
||||
subscription, lock, pool, termination_manager, transmission_manager, |
||||
expiration_manager): |
||||
"""Creates an _interfaces.ProtocolManager for invocation-side use. |
||||
|
||||
Args: |
||||
subscription: The local customer's subscription to the operation. |
||||
lock: The operation-wide lock. |
||||
pool: A thread pool. |
||||
termination_manager: The _interfaces.TerminationManager for the operation. |
||||
transmission_manager: The _interfaces.TransmissionManager for the |
||||
operation. |
||||
expiration_manager: The _interfaces.ExpirationManager for the operation. |
||||
""" |
||||
if subscription.kind is base.Subscription.Kind.FULL: |
||||
awaited_protocol_receiver = _Awaited( |
||||
_Awaited.Kind.ARRIVED, subscription.protocol_receiver) |
||||
else: |
||||
awaited_protocol_receiver = _ARRIVED_AND_NONE |
||||
return _ProtocolManager( |
||||
awaited_protocol_receiver, lock, pool, termination_manager, |
||||
transmission_manager, expiration_manager) |
||||
|
||||
|
||||
def service_protocol_manager( |
||||
lock, pool, termination_manager, transmission_manager, expiration_manager): |
||||
"""Creates an _interfaces.ProtocolManager for service-side use. |
||||
|
||||
Args: |
||||
lock: The operation-wide lock. |
||||
pool: A thread pool. |
||||
termination_manager: The _interfaces.TerminationManager for the operation. |
||||
transmission_manager: The _interfaces.TransmissionManager for the |
||||
operation. |
||||
expiration_manager: The _interfaces.ExpirationManager for the operation. |
||||
""" |
||||
return _ProtocolManager( |
||||
_NOT_YET_ARRIVED, lock, pool, termination_manager, transmission_manager, |
||||
expiration_manager) |
@ -0,0 +1,232 @@ |
||||
# Copyright 2015, Google Inc. |
||||
# All rights reserved. |
||||
# |
||||
# Redistribution and use in source and binary forms, with or without |
||||
# modification, are permitted provided that the following conditions are |
||||
# met: |
||||
# |
||||
# * Redistributions of source code must retain the above copyright |
||||
# notice, this list of conditions and the following disclaimer. |
||||
# * Redistributions in binary form must reproduce the above |
||||
# copyright notice, this list of conditions and the following disclaimer |
||||
# in the documentation and/or other materials provided with the |
||||
# distribution. |
||||
# * Neither the name of Google Inc. nor the names of its |
||||
# contributors may be used to endorse or promote products derived from |
||||
# this software without specific prior written permission. |
||||
# |
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
"""Tests Face interface compliance of the gRPC Python Beta API.""" |
||||
|
||||
import threading |
||||
import unittest |
||||
|
||||
from grpc.beta import implementations |
||||
from grpc.beta import interfaces |
||||
from grpc.framework.common import cardinality |
||||
from grpc.framework.interfaces.face import utilities |
||||
from grpc_test import resources |
||||
from grpc_test.beta import test_utilities |
||||
from grpc_test.framework.common import test_constants |
||||
|
||||
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' |
||||
|
||||
_GROUP = 'group' |
||||
_UNARY_UNARY = 'unary-unary' |
||||
_UNARY_STREAM = 'unary-stream' |
||||
_STREAM_UNARY = 'stream-unary' |
||||
_STREAM_STREAM = 'stream-stream' |
||||
|
||||
_REQUEST = b'abc' |
||||
_RESPONSE = b'123' |
||||
|
||||
|
||||
class _Servicer(object): |
||||
|
||||
def __init__(self): |
||||
self._condition = threading.Condition() |
||||
self._peer = None |
||||
self._serviced = False |
||||
|
||||
def unary_unary(self, request, context): |
||||
with self._condition: |
||||
self._request = request |
||||
self._peer = context.protocol_context().peer() |
||||
context.protocol_context().disable_next_response_compression() |
||||
self._serviced = True |
||||
self._condition.notify_all() |
||||
return _RESPONSE |
||||
|
||||
def unary_stream(self, request, context): |
||||
with self._condition: |
||||
self._request = request |
||||
self._peer = context.protocol_context().peer() |
||||
context.protocol_context().disable_next_response_compression() |
||||
self._serviced = True |
||||
self._condition.notify_all() |
||||
return |
||||
yield |
||||
|
||||
def stream_unary(self, request_iterator, context): |
||||
for request in request_iterator: |
||||
self._request = request |
||||
with self._condition: |
||||
self._peer = context.protocol_context().peer() |
||||
context.protocol_context().disable_next_response_compression() |
||||
self._serviced = True |
||||
self._condition.notify_all() |
||||
return _RESPONSE |
||||
|
||||
def stream_stream(self, request_iterator, context): |
||||
for request in request_iterator: |
||||
with self._condition: |
||||
self._peer = context.protocol_context().peer() |
||||
context.protocol_context().disable_next_response_compression() |
||||
yield _RESPONSE |
||||
with self._condition: |
||||
self._serviced = True |
||||
self._condition.notify_all() |
||||
|
||||
def peer(self): |
||||
with self._condition: |
||||
return self._peer |
||||
|
||||
def block_until_serviced(self): |
||||
with self._condition: |
||||
while not self._serviced: |
||||
self._condition.wait() |
||||
|
||||
|
||||
class _BlockingIterator(object): |
||||
|
||||
def __init__(self, upstream): |
||||
self._condition = threading.Condition() |
||||
self._upstream = upstream |
||||
self._allowed = [] |
||||
|
||||
def __iter__(self): |
||||
return self |
||||
|
||||
def next(self): |
||||
with self._condition: |
||||
while True: |
||||
if self._allowed is None: |
||||
raise StopIteration() |
||||
elif self._allowed: |
||||
return self._allowed.pop(0) |
||||
else: |
||||
self._condition.wait() |
||||
|
||||
def allow(self): |
||||
with self._condition: |
||||
try: |
||||
self._allowed.append(next(self._upstream)) |
||||
except StopIteration: |
||||
self._allowed = None |
||||
self._condition.notify_all() |
||||
|
||||
|
||||
class BetaFeaturesTest(unittest.TestCase): |
||||
|
||||
def setUp(self): |
||||
self._servicer = _Servicer() |
||||
method_implementations = { |
||||
(_GROUP, _UNARY_UNARY): |
||||
utilities.unary_unary_inline(self._servicer.unary_unary), |
||||
(_GROUP, _UNARY_STREAM): |
||||
utilities.unary_stream_inline(self._servicer.unary_stream), |
||||
(_GROUP, _STREAM_UNARY): |
||||
utilities.stream_unary_inline(self._servicer.stream_unary), |
||||
(_GROUP, _STREAM_STREAM): |
||||
utilities.stream_stream_inline(self._servicer.stream_stream), |
||||
} |
||||
|
||||
cardinalities = { |
||||
_UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, |
||||
_UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, |
||||
_STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, |
||||
_STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, |
||||
} |
||||
|
||||
server_options = implementations.server_options( |
||||
thread_pool_size=test_constants.POOL_SIZE) |
||||
self._server = implementations.server( |
||||
method_implementations, options=server_options) |
||||
server_credentials = implementations.ssl_server_credentials( |
||||
[(resources.private_key(), resources.certificate_chain(),),]) |
||||
port = self._server.add_secure_port('[::]:0', server_credentials) |
||||
self._server.start() |
||||
self._client_credentials = implementations.ssl_client_credentials( |
||||
resources.test_root_certificates(), None, None) |
||||
channel = test_utilities.not_really_secure_channel( |
||||
'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE) |
||||
stub_options = implementations.stub_options( |
||||
thread_pool_size=test_constants.POOL_SIZE) |
||||
self._dynamic_stub = implementations.dynamic_stub( |
||||
channel, _GROUP, cardinalities, options=stub_options) |
||||
|
||||
def tearDown(self): |
||||
self._dynamic_stub = None |
||||
self._server.stop(test_constants.SHORT_TIMEOUT).wait() |
||||
|
||||
def test_unary_unary(self): |
||||
call_options = interfaces.grpc_call_options( |
||||
disable_compression=True, credentials=self._client_credentials) |
||||
response = getattr(self._dynamic_stub, _UNARY_UNARY)( |
||||
_REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) |
||||
self.assertEqual(_RESPONSE, response) |
||||
self.assertIsNotNone(self._servicer.peer()) |
||||
|
||||
def test_unary_stream(self): |
||||
call_options = interfaces.grpc_call_options( |
||||
disable_compression=True, credentials=self._client_credentials) |
||||
response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)( |
||||
_REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) |
||||
self._servicer.block_until_serviced() |
||||
self.assertIsNotNone(self._servicer.peer()) |
||||
|
||||
def test_stream_unary(self): |
||||
call_options = interfaces.grpc_call_options( |
||||
credentials=self._client_credentials) |
||||
request_iterator = _BlockingIterator(iter((_REQUEST,))) |
||||
response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future( |
||||
request_iterator, test_constants.LONG_TIMEOUT, |
||||
protocol_options=call_options) |
||||
response_future.protocol_context().disable_next_request_compression() |
||||
request_iterator.allow() |
||||
response_future.protocol_context().disable_next_request_compression() |
||||
request_iterator.allow() |
||||
self._servicer.block_until_serviced() |
||||
self.assertIsNotNone(self._servicer.peer()) |
||||
self.assertEqual(_RESPONSE, response_future.result()) |
||||
|
||||
def test_stream_stream(self): |
||||
call_options = interfaces.grpc_call_options( |
||||
credentials=self._client_credentials) |
||||
request_iterator = _BlockingIterator(iter((_REQUEST,))) |
||||
response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)( |
||||
request_iterator, test_constants.SHORT_TIMEOUT, |
||||
protocol_options=call_options) |
||||
response_iterator.protocol_context().disable_next_request_compression() |
||||
request_iterator.allow() |
||||
response = next(response_iterator) |
||||
response_iterator.protocol_context().disable_next_request_compression() |
||||
request_iterator.allow() |
||||
self._servicer.block_until_serviced() |
||||
self.assertIsNotNone(self._servicer.peer()) |
||||
self.assertEqual(_RESPONSE, response) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
unittest.main(verbosity=2) |
Loading…
Reference in new issue