links interface and gRPC-on-wire implementation

This code branches out of the alpha/early_adopter code and begins
building toward the beta API.
pull/2251/head
Nathaniel Manista 10 years ago
parent 294d972629
commit 3abe61850e
  1. 30
      src/python/src/grpc/_links/__init__.py
  2. 88
      src/python/src/grpc/_links/_lonely_invocation_link_test.py
  3. 261
      src/python/src/grpc/_links/_proto_scenarios.py
  4. 226
      src/python/src/grpc/_links/_transmission_test.py
  5. 363
      src/python/src/grpc/_links/invocation.py
  6. 402
      src/python/src/grpc/_links/service.py
  7. 37
      src/python/src/grpc/framework/common/test_constants.py
  8. 87
      src/python/src/grpc/framework/common/test_control.py
  9. 116
      src/python/src/grpc/framework/common/test_coverage.py
  10. 175
      src/python/src/grpc/framework/foundation/relay.py
  11. 30
      src/python/src/grpc/framework/interfaces/__init__.py
  12. 30
      src/python/src/grpc/framework/interfaces/links/__init__.py
  13. 124
      src/python/src/grpc/framework/interfaces/links/links.py
  14. 332
      src/python/src/grpc/framework/interfaces/links/test_cases.py
  15. 66
      src/python/src/grpc/framework/interfaces/links/test_utilities.py
  16. 44
      src/python/src/grpc/framework/interfaces/links/utilities.py
  17. 4
      src/python/src/setup.py
  18. 12
      tools/run_tests/python_tests.json

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,88 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test of invocation-side code unconnected to an RPC server."""
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc.framework.common import test_constants
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import test_cases
from grpc.framework.interfaces.links import test_utilities
_NULL_BEHAVIOR = lambda unused_argument: None
class LonelyInvocationLinkTest(unittest.TestCase):
def testUpAndDown(self):
channel = _intermediary_low.Channel('nonexistent:54321', None)
invocation_link = invocation.invocation_link(channel, 'nonexistent', {}, {})
invocation_link.start()
invocation_link.stop()
def _test_lonely_invocation_with_termination(self, termination):
test_operation_id = object()
test_group = 'test package.Test Service'
test_method = 'test method'
invocation_link_mate = test_utilities.RecordingLink()
channel = _intermediary_low.Channel('nonexistent:54321', None)
invocation_link = invocation.invocation_link(
channel, 'nonexistent', {(test_group, test_method): _NULL_BEHAVIOR},
{(test_group, test_method): _NULL_BEHAVIOR})
invocation_link.join_link(invocation_link_mate)
invocation_link.start()
ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None,
None, None, None, None, termination)
invocation_link.accept_ticket(ticket)
invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
self.assertIsNot(
invocation_link_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
def testLonelyInvocationLinkWithCommencementTicket(self):
self._test_lonely_invocation_with_termination(None)
def testLonelyInvocationLinkWithEntireTicket(self):
self._test_lonely_invocation_with_termination(
links.Ticket.Termination.COMPLETION)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,261 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test scenarios using protocol buffers."""
import abc
import threading
from grpc._junkdrawer import math_pb2
class ProtoScenario(object):
"""An RPC test scenario using protocol buffers."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def group_and_method(self):
"""Access the test group and method.
Returns:
The test group and method as a pair.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_request(self, request):
"""Serialize a request protocol buffer.
Args:
request: A request protocol buffer.
Returns:
The bytestring serialization of the given request protocol buffer.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_request(self, request_bytestring):
"""Deserialize a request protocol buffer.
Args:
request_bytestring: The bytestring serialization of a request protocol
buffer.
Returns:
The request protocol buffer deserialized from the given byte string.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_response(self, response):
"""Serialize a response protocol buffer.
Args:
response: A response protocol buffer.
Returns:
The bytestring serialization of the given response protocol buffer.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_response(self, response_bytestring):
"""Deserialize a response protocol buffer.
Args:
response_bytestring: The bytestring serialization of a response protocol
buffer.
Returns:
The response protocol buffer deserialized from the given byte string.
"""
raise NotImplementedError()
@abc.abstractmethod
def requests(self):
"""Access the sequence of requests for this scenario.
Returns:
A sequence of request protocol buffers.
"""
raise NotImplementedError()
@abc.abstractmethod
def response_for_request(self, request):
"""Access the response for a particular request.
Args:
request: A request protocol buffer.
Returns:
The response protocol buffer appropriate for the given request.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify_requests(self, experimental_requests):
"""Verify the requests transmitted through the system under test.
Args:
experimental_requests: The request protocol buffers transmitted through
the system under test.
Returns:
True if the requests satisfy this test scenario; False otherwise.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify_responses(self, experimental_responses):
"""Verify the responses transmitted through the system under test.
Args:
experimental_responses: The response protocol buffers transmitted through
the system under test.
Returns:
True if the responses satisfy this test scenario; False otherwise.
"""
raise NotImplementedError()
class EmptyScenario(ProtoScenario):
"""A scenario that transmits no protocol buffers in either direction."""
def group_and_method(self):
return 'math.Math', 'DivMany'
def serialize_request(self, request):
raise ValueError('This should not be necessary to call!')
def deserialize_request(self, request_bytestring):
raise ValueError('This should not be necessary to call!')
def serialize_response(self, response):
raise ValueError('This should not be necessary to call!')
def deserialize_response(self, response_bytestring):
raise ValueError('This should not be necessary to call!')
def requests(self):
return ()
def response_for_request(self, request):
raise ValueError('This should not be necessary to call!')
def verify_requests(self, experimental_requests):
return not experimental_requests
def verify_responses(self, experimental_responses):
return not experimental_responses
class BidirectionallyUnaryScenario(ProtoScenario):
"""A scenario that transmits no protocol buffers in either direction."""
_DIVIDEND = 59
_DIVISOR = 7
_QUOTIENT = 8
_REMAINDER = 3
_REQUEST = math_pb2.DivArgs(dividend=_DIVIDEND, divisor=_DIVISOR)
_RESPONSE = math_pb2.DivReply(quotient=_QUOTIENT, remainder=_REMAINDER)
def group_and_method(self):
return 'math.Math', 'Div'
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, request_bytestring):
return math_pb2.DivArgs.FromString(request_bytestring)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, response_bytestring):
return math_pb2.DivReply.FromString(response_bytestring)
def requests(self):
return [self._REQUEST]
def response_for_request(self, request):
return self._RESPONSE
def verify_requests(self, experimental_requests):
return tuple(experimental_requests) == (self._REQUEST,)
def verify_responses(self, experimental_responses):
return tuple(experimental_responses) == (self._RESPONSE,)
class BidirectionallyStreamingScenario(ProtoScenario):
"""A scenario that transmits no protocol buffers in either direction."""
_STREAM_LENGTH = 200
_REQUESTS = tuple(
math_pb2.DivArgs(dividend=59 + index, divisor=7 + index)
for index in range(_STREAM_LENGTH))
def __init__(self):
self._lock = threading.Lock()
self._responses = []
def group_and_method(self):
return 'math.Math', 'DivMany'
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, request_bytestring):
return math_pb2.DivArgs.FromString(request_bytestring)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, response_bytestring):
return math_pb2.DivReply.FromString(response_bytestring)
def requests(self):
return self._REQUESTS
def response_for_request(self, request):
quotient, remainder = divmod(request.dividend, request.divisor)
response = math_pb2.DivReply(quotient=quotient, remainder=remainder)
with self._lock:
self._responses.append(response)
return response
def verify_requests(self, experimental_requests):
return tuple(experimental_requests) == self._REQUESTS
def verify_responses(self, experimental_responses):
with self._lock:
return tuple(experimental_responses) == tuple(self._responses)

@ -0,0 +1,226 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests transmission of tickets across gRPC-on-the-wire."""
import unittest
from grpc._adapter import _intermediary_low
from grpc._links import _proto_scenarios
from grpc._links import invocation
from grpc._links import service
from grpc.framework.common import test_constants
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import test_cases
from grpc.framework.interfaces.links import test_utilities
_IDENTITY = lambda x: x
class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
def create_transmitting_links(self):
service_link = service.service_link(
{self.group_and_method(): self.deserialize_request},
{self.group_and_method(): self.serialize_response})
port = service_link.add_port(0, None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost',
{self.group_and_method(): self.serialize_request},
{self.group_and_method(): self.deserialize_response})
invocation_link.start()
return invocation_link, service_link
def destroy_transmitting_links(self, invocation_side_link, service_side_link):
invocation_side_link.stop()
service_side_link.stop_gracefully()
def create_invocation_initial_metadata(self):
return (
('first invocation initial metadata key', 'just a string value'),
('second invocation initial metadata key', '0123456789'),
('third invocation initial metadata key-bin', '\x00\x57' * 100),
)
def create_invocation_terminal_metadata(self):
return None
def create_service_initial_metadata(self):
return (
('first service initial metadata key', 'just another string value'),
('second service initial metadata key', '9876543210'),
('third service initial metadata key-bin', '\x00\x59\x02' * 100),
)
def create_service_terminal_metadata(self):
return (
('first service terminal metadata key', 'yet another string value'),
('second service terminal metadata key', 'abcdefghij'),
('third service terminal metadata key-bin', '\x00\x37' * 100),
)
def create_invocation_completion(self):
return None, None
def create_service_completion(self):
return _intermediary_low.Code.OK, 'An exuberant test "details" message!'
def assertMetadataEqual(self, original_metadata, transmitted_metadata):
self.assertSequenceEqual(original_metadata, transmitted_metadata)
class RoundTripTest(unittest.TestCase):
def testZeroMessageRoundTrip(self):
test_operation_id = object()
test_group = 'test package.Test Group'
test_method = 'test method'
identity_transformation = {(test_group, test_method): _IDENTITY}
test_code = _intermediary_low.Code.OK
test_message = 'a test message'
service_link = service.service_link(
identity_transformation, identity_transformation)
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port(0, None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost', identity_transformation, identity_transformation)
invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate)
invocation_link.start()
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, links.Ticket.Termination.COMPLETION)
invocation_link.accept_ticket(invocation_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_ticket = links.Ticket(
service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION)
service_link.accept_ticket(service_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.stop_gracefully()
self.assertIs(
service_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
self.assertIs(
invocation_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_group, test_method = scenario.group_and_method()
test_code = _intermediary_low.Code.OK
test_message = 'a scenario test message'
service_link = service.service_link(
{(test_group, test_method): scenario.deserialize_request},
{(test_group, test_method): scenario.serialize_response})
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port(0, None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
channel, 'localhost',
{(test_group, test_method): scenario.serialize_request},
{(test_group, test_method): scenario.deserialize_response})
invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate)
invocation_link.start()
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
None, None, None, None, None)
invocation_link.accept_ticket(invocation_ticket)
requests = scenario.requests()
for request_index, request in enumerate(requests):
request_ticket = links.Ticket(
test_operation_id, 1 + request_index, None, None, None, None, 1, None,
request, None, None, None, None)
invocation_link.accept_ticket(request_ticket)
service_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
response_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_index, None, None,
None, None, 1, None, scenario.response_for_request(request), None,
None, None, None)
service_link.accept_ticket(response_ticket)
invocation_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
request_count = len(requests)
invocation_completion_ticket = links.Ticket(
test_operation_id, request_count + 1, None, None, None, None, None,
None, None, None, None, None, links.Ticket.Termination.COMPLETION)
invocation_link.accept_ticket(invocation_completion_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_completion_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_count, None, None, None,
None, None, None, None, None, test_code, test_message,
links.Ticket.Termination.COMPLETION)
service_link.accept_ticket(service_completion_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop()
service_link.stop_gracefully()
observed_requests = tuple(
ticket.payload for ticket in service_mate.tickets()
if ticket.payload is not None)
observed_responses = tuple(
ticket.payload for ticket in invocation_mate.tickets()
if ticket.payload is not None)
self.assertTrue(scenario.verify_requests(observed_requests))
self.assertTrue(scenario.verify_responses(observed_responses))
def testEmptyScenario(self):
self._perform_scenario_test(_proto_scenarios.EmptyScenario())
def testBidirectionallyUnaryScenario(self):
self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
def testBidirectionallyStreamingScenario(self):
self._perform_scenario_test(
_proto_scenarios.BidirectionallyStreamingScenario())
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,363 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The RPC-invocation-side bridge between RPC Framework and GRPC-on-the-wire."""
import abc
import enum
import logging
import threading
import time
from grpc._adapter import _intermediary_low
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay
from grpc.framework.interfaces.links import links
@enum.unique
class _Read(enum.Enum):
AWAITING_METADATA = 'awaiting metadata'
READING = 'reading'
AWAITING_ALLOWANCE = 'awaiting allowance'
CLOSED = 'closed'
@enum.unique
class _HighWrite(enum.Enum):
OPEN = 'open'
CLOSED = 'closed'
@enum.unique
class _LowWrite(enum.Enum):
OPEN = 'OPEN'
ACTIVE = 'ACTIVE'
CLOSED = 'CLOSED'
class _RPCState(object):
def __init__(
self, call, request_serializer, response_deserializer, sequence_number,
read, allowance, high_write, low_write):
self.call = call
self.request_serializer = request_serializer
self.response_deserializer = response_deserializer
self.sequence_number = sequence_number
self.read = read
self.allowance = allowance
self.high_write = high_write
self.low_write = low_write
class _Kernel(object):
def __init__(
self, channel, host, request_serializers, response_deserializers,
ticket_relay):
self._lock = threading.Lock()
self._channel = channel
self._host = host
self._request_serializers = request_serializers
self._response_deserializers = response_deserializers
self._relay = ticket_relay
self._completion_queue = None
self._rpc_states = None
self._pool = None
def _on_write_event(self, operation_id, unused_event, rpc_state):
if rpc_state.high_write is _HighWrite.CLOSED:
rpc_state.call.complete(operation_id)
rpc_state.low_write = _LowWrite.CLOSED
else:
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, 1,
None, None, None, None, None, None)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
rpc_state.low_write = _LowWrite.OPEN
def _on_read_event(self, operation_id, event, rpc_state):
if event.bytes is None:
rpc_state.read = _Read.CLOSED
else:
if 0 < rpc_state.allowance:
rpc_state.allowance -= 1
rpc_state.call.read(operation_id)
else:
rpc_state.read = _Read.AWAITING_ALLOWANCE
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, None,
None, rpc_state.response_deserializer(event.bytes), None, None, None,
None)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
def _on_metadata_event(self, operation_id, event, rpc_state):
rpc_state.allowance -= 1
rpc_state.call.read(operation_id)
rpc_state.read = _Read.READING
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None,
links.Ticket.Subscription.FULL, None, None, event.metadata, None, None,
None, None, None)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
def _on_finish_event(self, operation_id, event, rpc_state):
self._rpc_states.pop(operation_id, None)
if event.status.code is _intermediary_low.Code.OK:
termination = links.Ticket.Termination.COMPLETION
elif event.status.code is _intermediary_low.Code.CANCELLED:
termination = links.Ticket.Termination.CANCELLATION
elif event.status.code is _intermediary_low.Code.DEADLINE_EXCEEDED:
termination = links.Ticket.Termination.EXPIRATION
else:
termination = links.Ticket.Termination.TRANSMISSION_FAILURE
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, None,
None, None, event.metadata, event.status.code, event.status.details,
termination)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
def _spin(self, completion_queue):
while True:
event = completion_queue.get(None)
if event.kind is _intermediary_low.Event.Kind.STOP:
return
operation_id = event.tag
with self._lock:
if self._completion_queue is None:
continue
rpc_state = self._rpc_states.get(operation_id)
if rpc_state is not None:
if event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED:
self._on_write_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.METADATA_ACCEPTED:
self._on_metadata_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED:
self._on_read_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.FINISH:
self._on_finish_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED:
pass
else:
logging.error('Illegal RPC event! %s', (event,))
def _invoke(
self, operation_id, group, method, initial_metadata, payload, termination,
timeout, allowance):
"""Invoke an RPC.
Args:
operation_id: Any object to be used as an operation ID for the RPC.
group: The group to which the RPC method belongs.
method: The RPC method name.
initial_metadata: The initial metadata object for the RPC.
payload: A payload object for the RPC or None if no payload was given at
invocation-time.
termination: A links.Ticket.Termination value or None indicated whether or
not more writes will follow from this side of the RPC.
timeout: A duration of time in seconds to allow for the RPC.
allowance: The number of payloads (beyond the free first one) that the
local ticket exchange mate has granted permission to be read.
"""
if termination is links.Ticket.Termination.COMPLETION:
high_write = _HighWrite.CLOSED
elif termination is None:
high_write = _HighWrite.OPEN
else:
return
request_serializer = self._request_serializers.get((group, method))
response_deserializer = self._response_deserializers.get((group, method))
if request_serializer is None or response_deserializer is None:
cancellation_ticket = links.Ticket(
operation_id, 0, None, None, None, None, None, None, None, None, None,
None, links.Ticket.Termination.CANCELLATION)
self._relay.add_value(cancellation_ticket)
return
call = _intermediary_low.Call(
self._channel, self._completion_queue, '/%s/%s' % (group, method),
self._host, time.time() + timeout)
if initial_metadata is not None:
for metadata_key, metadata_value in initial_metadata:
call.add_metadata(metadata_key, metadata_value)
call.invoke(self._completion_queue, operation_id, operation_id)
if payload is None:
if high_write is _HighWrite.CLOSED:
call.complete(operation_id)
low_write = _LowWrite.CLOSED
else:
low_write = _LowWrite.OPEN
else:
call.write(request_serializer(payload), operation_id)
low_write = _LowWrite.ACTIVE
self._rpc_states[operation_id] = _RPCState(
call, request_serializer, response_deserializer, 0,
_Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance),
high_write, low_write)
def _advance(self, operation_id, rpc_state, payload, termination, allowance):
if payload is not None:
rpc_state.call.write(rpc_state.request_serializer(payload), operation_id)
rpc_state.low_write = _LowWrite.ACTIVE
if allowance is not None:
if rpc_state.read is _Read.AWAITING_ALLOWANCE:
rpc_state.allowance += allowance - 1
rpc_state.call.read(operation_id)
rpc_state.read = _Read.READING
else:
rpc_state.allowance += allowance
if termination is links.Ticket.Termination.COMPLETION:
rpc_state.high_write = _HighWrite.CLOSED
if rpc_state.low_write is _LowWrite.OPEN:
rpc_state.call.complete(operation_id)
rpc_state.low_write = _LowWrite.CLOSED
elif termination is not None:
rpc_state.call.cancel()
def add_ticket(self, ticket):
with self._lock:
if self._completion_queue is None:
return
if ticket.sequence_number == 0:
self._invoke(
ticket.operation_id, ticket.group, ticket.method,
ticket.initial_metadata, ticket.payload, ticket.termination,
ticket.timeout, ticket.allowance)
else:
rpc_state = self._rpc_states.get(ticket.operation_id)
if rpc_state is not None:
self._advance(
ticket.operation_id, rpc_state, ticket.payload,
ticket.termination, ticket.allowance)
def start(self):
"""Starts this object.
This method must be called before attempting to exchange tickets with this
object.
"""
with self._lock:
self._completion_queue = _intermediary_low.CompletionQueue()
self._rpc_states = {}
self._pool = logging_pool.pool(1)
self._pool.submit(self._spin, self._completion_queue)
def stop(self):
"""Stops this object.
This method must be called for proper termination of this object, and no
attempts to exchange tickets with this object may be made after this method
has been called.
"""
with self._lock:
self._completion_queue.stop()
self._completion_queue = None
pool = self._pool
self._pool = None
self._rpc_states = None
pool.shutdown(wait=True)
class InvocationLink(links.Link, activated.Activated):
"""A links.Link for use on the invocation-side of a gRPC connection.
Implementations of this interface are only valid for use when activated.
"""
__metaclass__ = abc.ABCMeta
class _InvocationLink(InvocationLink):
def __init__(
self, channel, host, request_serializers, response_deserializers):
self._relay = relay.relay(None)
self._kernel = _Kernel(
channel, host, request_serializers, response_deserializers, self._relay)
def _start(self):
self._relay.start()
self._kernel.start()
return self
def _stop(self):
self._kernel.stop()
self._relay.stop()
def accept_ticket(self, ticket):
"""See links.Link.accept_ticket for specification."""
self._kernel.add_ticket(ticket)
def join_link(self, link):
"""See links.Link.join_link for specification."""
self._relay.set_behavior(link.accept_ticket)
def __enter__(self):
"""See activated.Activated.__enter__ for specification."""
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
"""See activated.Activated.__exit__ for specification."""
self._stop()
return False
def start(self):
"""See activated.Activated.start for specification."""
return self._start()
def stop(self):
"""See activated.Activated.stop for specification."""
self._stop()
def invocation_link(channel, host, request_serializers, response_deserializers):
"""Creates an InvocationLink.
Args:
channel: A channel for use by the link.
host: The host to specify when invoking RPCs.
request_serializers: A dict from group-method pair to request object
serialization behavior.
response_deserializers: A dict from group-method pair to response object
deserialization behavior.
Returns:
An InvocationLink.
"""
return _InvocationLink(
channel, host, request_serializers, response_deserializers)

@ -0,0 +1,402 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire."""
import abc
import enum
import logging
import threading
import time
from grpc._adapter import _intermediary_low
from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay
from grpc.framework.interfaces.links import links
@enum.unique
class _Read(enum.Enum):
READING = 'reading'
AWAITING_ALLOWANCE = 'awaiting allowance'
CLOSED = 'closed'
@enum.unique
class _HighWrite(enum.Enum):
OPEN = 'open'
CLOSED = 'closed'
@enum.unique
class _LowWrite(enum.Enum):
"""The possible categories of low-level write state."""
OPEN = 'OPEN'
ACTIVE = 'ACTIVE'
CLOSED = 'CLOSED'
class _RPCState(object):
def __init__(
self, request_deserializer, response_serializer, sequence_number, read,
allowance, high_write, low_write, premetadataed, terminal_metadata, code,
message):
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.sequence_number = sequence_number
self.read = read
self.allowance = allowance
self.high_write = high_write
self.low_write = low_write
self.premetadataed = premetadataed
self.terminal_metadata = terminal_metadata
self.code = code
self.message = message
def _metadatafy(call, metadata):
for metadata_key, metadata_value in metadata:
call.add_metadata(metadata_key, metadata_value)
class _Kernel(object):
def __init__(self, request_deserializers, response_serializers, ticket_relay):
self._lock = threading.Lock()
self._request_deserializers = request_deserializers
self._response_serializers = response_serializers
self._relay = ticket_relay
self._completion_queue = None
self._server = None
self._rpc_states = {}
self._pool = None
def _on_service_acceptance_event(self, event, server):
server.service(None)
service_acceptance = event.service_acceptance
call = service_acceptance.call
call.accept(self._completion_queue, call)
try:
group, method = service_acceptance.method.split('/')[1:3]
except ValueError:
logging.info('Illegal path "%s"!', service_acceptance.method)
return
request_deserializer = self._request_deserializers.get((group, method))
response_serializer = self._response_serializers.get((group, method))
if request_deserializer is None or response_serializer is None:
# TODO(nathaniel): Terminate the RPC with code NOT_FOUND.
call.cancel()
return
call.read(call)
self._rpc_states[call] = _RPCState(
request_deserializer, response_serializer, 1, _Read.READING, 0,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None)
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
service_acceptance.deadline - time.time(), None, event.metadata, None,
None, None, None, None)
self._relay.add_value(ticket)
def _on_read_event(self, event):
call = event.tag
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
if event.bytes is None:
rpc_state.read = _Read.CLOSED
payload = None
termination = links.Ticket.Termination.COMPLETION
else:
if 0 < rpc_state.allowance:
rpc_state.allowance -= 1
call.read(call)
else:
rpc_state.read = _Read.AWAITING_ALLOWANCE
payload = rpc_state.request_deserializer(event.bytes)
termination = None
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None,
payload, None, None, None, termination)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
def _on_write_event(self, event):
call = event.tag
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
if rpc_state.high_write is _HighWrite.CLOSED:
if rpc_state.terminal_metadata is not None:
_metadatafy(call, rpc_state.terminal_metadata)
call.status(
_intermediary_low.Status(rpc_state.code, rpc_state.message), call)
rpc_state.low_write = _LowWrite.CLOSED
else:
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, 1, None,
None, None, None, None, None)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
rpc_state.low_write = _LowWrite.OPEN
def _on_finish_event(self, event):
call = event.tag
rpc_state = self._rpc_states.pop(call, None)
if rpc_state is None:
return
code = event.status.code
if code is _intermediary_low.Code.OK:
return
if code is _intermediary_low.Code.CANCELLED:
termination = links.Ticket.Termination.CANCELLATION
elif code is _intermediary_low.Code.DEADLINE_EXCEEDED:
termination = links.Ticket.Termination.EXPIRATION
else:
termination = links.Ticket.Termination.TRANSMISSION_FAILURE
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None,
None, None, None, None, termination)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
def _spin(self, completion_queue, server):
while True:
event = completion_queue.get(None)
if event.kind is _intermediary_low.Event.Kind.STOP:
return
with self._lock:
if self._server is None:
continue
elif event.kind is _intermediary_low.Event.Kind.SERVICE_ACCEPTED:
self._on_service_acceptance_event(event, server)
elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED:
self._on_read_event(event)
elif event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED:
self._on_write_event(event)
elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED:
pass
elif event.kind is _intermediary_low.Event.Kind.FINISH:
self._on_finish_event(event)
else:
logging.error('Illegal event! %s', (event,))
def add_ticket(self, ticket):
with self._lock:
if self._server is None:
return
call = ticket.operation_id
rpc_state = self._rpc_states.get(call)
if rpc_state is None:
return
if ticket.initial_metadata is not None:
_metadatafy(call, ticket.initial_metadata)
call.premetadata()
rpc_state.premetadataed = True
elif not rpc_state.premetadataed:
if (ticket.terminal_metadata is not None or
ticket.payload is not None or
ticket.termination is links.Ticket.Termination.COMPLETION or
ticket.code is not None or
ticket.message is not None):
call.premetadata()
rpc_state.premetadataed = True
if ticket.allowance is not None:
if rpc_state.read is _Read.AWAITING_ALLOWANCE:
rpc_state.allowance += ticket.allowance - 1
call.read(call)
rpc_state.read = _Read.READING
else:
rpc_state.allowance += ticket.allowance
if ticket.payload is not None:
call.write(rpc_state.response_serializer(ticket.payload), call)
rpc_state.low_write = _LowWrite.ACTIVE
if ticket.terminal_metadata is not None:
rpc_state.terminal_metadata = ticket.terminal_metadata
if ticket.code is not None:
rpc_state.code = ticket.code
if ticket.message is not None:
rpc_state.message = ticket.message
if ticket.termination is links.Ticket.Termination.COMPLETION:
rpc_state.high_write = _HighWrite.CLOSED
if rpc_state.low_write is _LowWrite.OPEN:
if rpc_state.terminal_metadata is not None:
_metadatafy(call, rpc_state.terminal_metadata)
status = _intermediary_low.Status(
_intermediary_low.Code.OK
if rpc_state.code is None else rpc_state.code,
'' if rpc_state.message is None else rpc_state.message)
call.status(status, call)
rpc_state.low_write = _LowWrite.CLOSED
elif ticket.termination is not None:
call.cancel()
self._rpc_states.pop(call, None)
def add_port(self, port, server_credentials):
with self._lock:
address = '[::]:%d' % port
if self._server is None:
self._completion_queue = _intermediary_low.CompletionQueue()
self._server = _intermediary_low.Server(self._completion_queue)
if server_credentials is None:
return self._server.add_http2_addr(address)
else:
return self._server.add_secure_http2_addr(address, server_credentials)
def start(self):
with self._lock:
if self._server is None:
self._completion_queue = _intermediary_low.CompletionQueue()
self._server = _intermediary_low.Server(self._completion_queue)
self._pool = logging_pool.pool(1)
self._pool.submit(self._spin, self._completion_queue, self._server)
self._server.start()
self._server.service(None)
def graceful_stop(self):
with self._lock:
self._server.stop()
self._server = None
self._completion_queue.stop()
self._completion_queue = None
pool = self._pool
self._pool = None
self._rpc_states = None
pool.shutdown(wait=True)
def immediate_stop(self):
# TODO(nathaniel): Implementation.
raise NotImplementedError(
'TODO(nathaniel): after merge of rewritten lower layers')
class ServiceLink(links.Link):
"""A links.Link for use on the service-side of a gRPC connection.
Implementations of this interface are only valid for use between calls to
their start method and one of their stop methods.
"""
@abc.abstractmethod
def add_port(self, port, server_credentials):
"""Adds a port on which to service RPCs after this link has been started.
Args:
port: The port on which to service RPCs, or zero to request that a port be
automatically selected and used.
server_credentials: A ServerCredentials object, or None for insecure
service.
Returns:
A port on which RPCs will be serviced after this link has been started.
"""
raise NotImplementedError()
@abc.abstractmethod
def start(self):
"""Starts this object.
This method must be called before attempting to use this Link in ticket
exchange.
"""
raise NotImplementedError()
@abc.abstractmethod
def stop_gracefully(self):
"""Stops this link.
New RPCs will be rejected as soon as this method is called, but ongoing RPCs
will be allowed to continue until they terminate. This method blocks until
all RPCs have terminated.
"""
raise NotImplementedError()
@abc.abstractmethod
def stop_immediately(self):
"""Stops this link.
All in-progress RPCs will be terminated immediately.
"""
raise NotImplementedError()
class _ServiceLink(ServiceLink):
def __init__(self, request_deserializers, response_serializers):
self._relay = relay.relay(None)
self._kernel = _Kernel(
request_deserializers, response_serializers, self._relay)
def accept_ticket(self, ticket):
self._kernel.add_ticket(ticket)
def join_link(self, link):
self._relay.set_behavior(link.accept_ticket)
def add_port(self, port, server_credentials):
return self._kernel.add_port(port, server_credentials)
def start(self):
self._relay.start()
return self._kernel.start()
def stop_gracefully(self):
self._kernel.graceful_stop()
self._relay.stop()
def stop_immediately(self):
self._kernel.immediate_stop()
self._relay.stop()
def service_link(request_deserializers, response_serializers):
"""Creates a ServiceLink.
Args:
request_deserializers: A dict from group-method pair to request object
deserialization behavior.
response_serializers: A dict from group-method pair to response ojbect
serialization behavior.
Returns:
A ServiceLink.
"""
return _ServiceLink(request_deserializers, response_serializers)

@ -0,0 +1,37 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Constants shared among tests throughout RPC Framework."""
# Value for maximum duration in seconds of RPCs that may time out as part of a
# test.
SHORT_TIMEOUT = 4
# Absurdly large value for maximum duration in seconds for should-not-time-out
# RPCs made during tests.
LONG_TIMEOUT = 3000

@ -0,0 +1,87 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Code for instructing systems under test to block or fail."""
import abc
import contextlib
import threading
class Control(object):
"""An object that accepts program control from a system under test.
Systems under test passed a Control should call its control() method
frequently during execution. The control() method may block, raise an
exception, or do nothing, all according to the enclosing test's desire for
the system under test to simulate hanging, failing, or functioning.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def control(self):
"""Potentially does anything."""
raise NotImplementedError()
class PauseFailControl(Control):
"""A Control that can be used to pause or fail code under control."""
def __init__(self):
self._condition = threading.Condition()
self._paused = False
self._fail = False
def control(self):
with self._condition:
if self._fail:
raise ValueError()
while self._paused:
self._condition.wait()
@contextlib.contextmanager
def pause(self):
"""Pauses code under control while controlling code is in context."""
with self._condition:
self._paused = True
yield
with self._condition:
self._paused = False
self._condition.notify_all()
@contextlib.contextmanager
def fail(self):
"""Fails code under control while controlling code is in context."""
with self._condition:
self._fail = True
yield
with self._condition:
self._fail = False

@ -0,0 +1,116 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Governs coverage for tests of RPCs throughout RPC Framework."""
import abc
# This code is designed for use with the unittest module.
# pylint: disable=invalid-name
class Coverage(object):
"""Specification of test coverage."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def testSuccessfulUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSuccessfulUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSuccessfulStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSuccessfulStreamRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testSequentialInvocations(self):
raise NotImplementedError()
@abc.abstractmethod
def testParallelInvocations(self):
raise NotImplementedError()
@abc.abstractmethod
def testWaitingForSomeButNotAllParallelInvocations(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testCancelledStreamRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testExpiredStreamRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedUnaryRequestStreamResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedStreamRequestUnaryResponse(self):
raise NotImplementedError()
@abc.abstractmethod
def testFailedStreamRequestStreamResponse(self):
raise NotImplementedError()

@ -0,0 +1,175 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Implementations of in-order work deference."""
import abc
import enum
import threading
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
_NULL_BEHAVIOR = lambda unused_value: None
class Relay(object):
"""Performs work submitted to it in another thread.
Performs work in the order in which work was submitted to it; otherwise there
would be no reason to use an implementation of this interface instead of a
thread pool.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def add_value(self, value):
"""Adds a value to be passed to the behavior registered with this Relay.
Args:
value: A value that will be passed to a call made in another thread to the
behavior registered with this Relay.
"""
raise NotImplementedError()
@abc.abstractmethod
def set_behavior(self, behavior):
"""Sets the behavior that this Relay should call when passed values.
Args:
behavior: The behavior that this Relay should call in another thread when
passed a value, or None to have passed values ignored.
"""
raise NotImplementedError()
class _PoolRelay(activated.Activated, Relay):
@enum.unique
class _State(enum.Enum):
INACTIVE = 'inactive'
IDLE = 'idle'
SPINNING = 'spinning'
def __init__(self, pool, behavior):
self._condition = threading.Condition()
self._pool = pool
self._own_pool = pool is None
self._state = _PoolRelay._State.INACTIVE
self._activated = False
self._spinning = False
self._values = []
self._behavior = _NULL_BEHAVIOR if behavior is None else behavior
def _spin(self, behavior, value):
while True:
behavior(value)
with self._condition:
if self._values:
value = self._values.pop(0)
behavior = self._behavior
else:
self._state = _PoolRelay._State.IDLE
self._condition.notify_all()
break
def add_value(self, value):
with self._condition:
if self._state is _PoolRelay._State.INACTIVE:
raise ValueError('add_value not valid on inactive Relay!')
elif self._state is _PoolRelay._State.IDLE:
self._pool.submit(self._spin, self._behavior, value)
self._state = _PoolRelay._State.SPINNING
else:
self._values.append(value)
def set_behavior(self, behavior):
with self._condition:
self._behavior = _NULL_BEHAVIOR if behavior is None else behavior
def _start(self):
with self._condition:
self._state = _PoolRelay._State.IDLE
if self._own_pool:
self._pool = logging_pool.pool(1)
return self
def _stop(self):
with self._condition:
while self._state is _PoolRelay._State.SPINNING:
self._condition.wait()
if self._own_pool:
self._pool.shutdown(wait=True)
self._state = _PoolRelay._State.INACTIVE
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def relay(behavior):
"""Creates a Relay.
Args:
behavior: The behavior to be called by the created Relay, or None to have
passed values dropped until a different behavior is given to the returned
Relay later.
Returns:
An object that is both an activated.Activated and a Relay. The object is
only valid for use as a Relay when activated.
"""
return _PoolRelay(None, behavior)
def pool_relay(pool, behavior):
"""Creates a Relay that uses a given thread pool.
This object will make use of at most one thread in the given pool.
Args:
pool: A futures.ThreadPoolExecutor for use by the created Relay.
behavior: The behavior to be called by the created Relay, or None to have
passed values dropped until a different behavior is given to the returned
Relay later.
Returns:
An object that is both an activated.Activated and a Relay. The object is
only valid for use as a Relay when activated.
"""
return _PoolRelay(pool, behavior)

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,124 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The low-level ticket-exchanging-links interface of RPC Framework."""
import abc
import collections
import enum
class Ticket(
collections.namedtuple(
'Ticket',
['operation_id', 'sequence_number', 'group', 'method', 'subscription',
'timeout', 'allowance', 'initial_metadata', 'payload',
'terminal_metadata', 'code', 'message', 'termination'])):
"""A sum type for all values sent from a front to a back.
Attributes:
operation_id: A unique-with-respect-to-equality hashable object identifying
a particular operation.
sequence_number: A zero-indexed integer sequence number identifying the
ticket's place in the stream of tickets sent in one direction for the
particular operation.
group: The group to which the method of the operation belongs. Must be
present in the first ticket from invocation side to service side. Ignored
for all other tickets exchanged during the operation.
method: The name of an operation. Must be present in the first ticket from
invocation side to service side. Ignored for all other tickets exchanged
during the operation.
subscription: A Subscription value describing the interest one side has in
receiving information from the other side. Must be present in the first
ticket from either side. Ignored for all other tickets exchanged during
the operation.
timeout: A nonzero length of time (measured from the beginning of the
operation) to allow for the entire operation. Must be present in the first
ticket from invocation side to service side. Optional for all other
tickets exchanged during the operation. Receipt of a value from the other
side of the operation indicates the value in use by that side. Setting a
value on a later ticket allows either side to request time extensions (or
even time reductions!) on in-progress operations.
allowance: A positive integer granting permission for a number of payloads
to be transmitted to the communicating side of the operation, or None if
no additional allowance is being granted with this ticket.
initial_metadata: An optional metadata value communicated from one side to
the other at the beginning of the operation. May be non-None in at most
one ticket from each side. Any non-None value must appear no later than
the first payload value.
payload: A customer payload object. May be None.
terminal_metadata: A metadata value comminicated from one side to the other
at the end of the operation. May be non-None in the same ticket as
the code and message, but must be None for all earlier tickets.
code: A value communicated at operation completion. May be None.
message: A value communicated at operation completion. May be None.
termination: A Termination value describing the end of the operation, or
None if the operation has not yet terminated. If set, no further tickets
may be sent in the same direction.
"""
@enum.unique
class Subscription(enum.Enum):
"""Identifies the level of subscription of a side of an operation."""
NONE = 'none'
TERMINATION = 'termination'
FULL = 'full'
@enum.unique
class Termination(enum.Enum):
"""Identifies the termination of an operation."""
COMPLETION = 'completion'
CANCELLATION = 'cancellation'
EXPIRATION = 'expiration'
LOCAL_SHUTDOWN = 'local shutdown'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
LOCAL_FAILURE = 'local failure'
REMOTE_FAILURE = 'remote failure'
class Link(object):
"""Accepts and emits tickets."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def accept_ticket(self, ticket):
"""Accept a Ticket.
Args:
ticket: Any Ticket.
"""
raise NotImplementedError()
@abc.abstractmethod
def join_link(self, link):
"""Mates this object with a peer with which it will exchange tickets."""
raise NotImplementedError()

@ -0,0 +1,332 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the links interface of RPC Framework."""
# unittest is referenced from specification in this module.
import abc
import unittest # pylint: disable=unused-import
from grpc.framework.common import test_constants
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import test_utilities
def at_least_n_payloads_received_predicate(n):
def predicate(ticket_sequence):
payload_count = 0
for ticket in ticket_sequence:
if ticket.payload is not None:
payload_count += 1
if n <= payload_count:
return True
else:
return False
return predicate
def terminated(ticket_sequence):
return ticket_sequence and ticket_sequence[-1].termination is not None
_TRANSMISSION_GROUP = 'test.Group'
_TRANSMISSION_METHOD = 'TestMethod'
class TransmissionTest(object):
"""Tests ticket transmission between two connected links.
This class must be mixed into a unittest.TestCase that implements the abstract
methods it provides.
"""
__metaclass__ = abc.ABCMeta
# This is a unittest.TestCase mix-in.
# pylint: disable=invalid-name
@abc.abstractmethod
def create_transmitting_links(self):
"""Creates two connected links for use in this test.
Returns:
Two links.Links, the first of which will be used on the invocation side
of RPCs and the second of which will be used on the service side of
RPCs.
"""
raise NotImplementedError()
@abc.abstractmethod
def destroy_transmitting_links(self, invocation_side_link, service_side_link):
"""Destroys the two connected links created for this test.
Args:
invocation_side_link: The link used on the invocation side of RPCs in
this test.
service_side_link: The link used on the service side of RPCs in this
test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_invocation_initial_metadata(self):
"""Creates a value for use as invocation-side initial metadata.
Returns:
A metadata value appropriate for use as invocation-side initial metadata
or None if invocation-side initial metadata transmission is not
supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_invocation_terminal_metadata(self):
"""Creates a value for use as invocation-side terminal metadata.
Returns:
A metadata value appropriate for use as invocation-side terminal
metadata or None if invocation-side terminal metadata transmission is
not supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_service_initial_metadata(self):
"""Creates a value for use as service-side initial metadata.
Returns:
A metadata value appropriate for use as service-side initial metadata or
None if service-side initial metadata transmission is not supported by
the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_service_terminal_metadata(self):
"""Creates a value for use as service-side terminal metadata.
Returns:
A metadata value appropriate for use as service-side terminal metadata or
None if service-side terminal metadata transmission is not supported by
the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_invocation_completion(self):
"""Creates values for use as invocation-side code and message.
Returns:
An invocation-side code value and an invocation-side message value.
Either or both may be None if invocation-side code and/or
invocation-side message transmission is not supported by the links
under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_service_completion(self):
"""Creates values for use as service-side code and message.
Returns:
A service-side code value and a service-side message value. Either or
both may be None if service-side code and/or service-side message
transmission is not supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
def assertMetadataEqual(self, original_metadata, transmitted_metadata):
"""Asserts that two metadata objects are equal.
Args:
original_metadata: A metadata object used in this test.
transmitted_metadata: A metadata object obtained after transmission
through the system under test.
Raises:
AssertionError: if the two metadata objects are not equal.
"""
raise NotImplementedError()
def group_and_method(self):
"""Returns the group and method used in this test case.
Returns:
A pair of the group and method used in this test case.
"""
return _TRANSMISSION_GROUP, _TRANSMISSION_METHOD
def serialize_request(self, request):
"""Serializes a request value used in this test case.
Args:
request: A request value created by this test case.
Returns:
A bytestring that is the serialization of the given request.
"""
return request
def deserialize_request(self, serialized_request):
"""Deserializes a request value used in this test case.
Args:
serialized_request: A bytestring that is the serialization of some request
used in this test case.
Returns:
The request value encoded by the given bytestring.
"""
return serialized_request
def serialize_response(self, response):
"""Serializes a response value used in this test case.
Args:
response: A response value created by this test case.
Returns:
A bytestring that is the serialization of the given response.
"""
return response
def deserialize_response(self, serialized_response):
"""Deserializes a response value used in this test case.
Args:
serialized_response: A bytestring that is the serialization of some
response used in this test case.
Returns:
The response value encoded by the given bytestring.
"""
return serialized_response
def _assert_is_valid_metadata_payload_sequence(
self, ticket_sequence, payloads, initial_metadata, terminal_metadata):
initial_metadata_seen = False
seen_payloads = []
terminal_metadata_seen = False
for ticket in ticket_sequence:
if ticket.initial_metadata is not None:
self.assertFalse(initial_metadata_seen)
self.assertFalse(seen_payloads)
self.assertFalse(terminal_metadata_seen)
self.assertMetadataEqual(initial_metadata, ticket.initial_metadata)
initial_metadata_seen = True
if ticket.payload is not None:
self.assertFalse(terminal_metadata_seen)
seen_payloads.append(ticket.payload)
if ticket.terminal_metadata is not None:
self.assertFalse(terminal_metadata_seen)
self.assertMetadataEqual(terminal_metadata, ticket.terminal_metadata)
terminal_metadata_seen = True
self.assertSequenceEqual(payloads, seen_payloads)
def _assert_is_valid_invocation_sequence(
self, ticket_sequence, group, method, payloads, initial_metadata,
terminal_metadata, termination):
self.assertLess(0, len(ticket_sequence))
self.assertEqual(group, ticket_sequence[0].group)
self.assertEqual(method, ticket_sequence[0].method)
self._assert_is_valid_metadata_payload_sequence(
ticket_sequence, payloads, initial_metadata, terminal_metadata)
self.assertIs(termination, ticket_sequence[-1].termination)
def _assert_is_valid_service_sequence(
self, ticket_sequence, payloads, initial_metadata, terminal_metadata,
code, message, termination):
self.assertLess(0, len(ticket_sequence))
self._assert_is_valid_metadata_payload_sequence(
ticket_sequence, payloads, initial_metadata, terminal_metadata)
self.assertEqual(code, ticket_sequence[-1].code)
self.assertEqual(message, ticket_sequence[-1].message)
self.assertIs(termination, ticket_sequence[-1].termination)
def setUp(self):
self._invocation_link, self._service_link = self.create_transmitting_links()
self._invocation_mate = test_utilities.RecordingLink()
self._service_mate = test_utilities.RecordingLink()
self._invocation_link.join_link(self._invocation_mate)
self._service_link.join_link(self._service_mate)
def tearDown(self):
self.destroy_transmitting_links(self._invocation_link, self._service_link)
def testSimplestRoundTrip(self):
"""Tests transmission of one ticket in each direction."""
invocation_operation_id = object()
invocation_payload = b'\x07' * 1023
timeout = test_constants.LONG_TIMEOUT
invocation_initial_metadata = self.create_invocation_initial_metadata()
invocation_terminal_metadata = self.create_invocation_terminal_metadata()
invocation_code, invocation_message = self.create_invocation_completion()
service_payload = b'\x08' * 1025
service_initial_metadata = self.create_service_initial_metadata()
service_terminal_metadata = self.create_service_terminal_metadata()
service_code, service_message = self.create_service_completion()
original_invocation_ticket = links.Ticket(
invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata,
invocation_payload, invocation_terminal_metadata, invocation_code,
invocation_message, links.Ticket.Termination.COMPLETION)
self._invocation_link.accept_ticket(original_invocation_ticket)
# TODO(nathaniel): This shouldn't be necessary. Detecting the end of the
# invocation-side ticket sequence shouldn't require granting allowance for
# another payload.
self._service_mate.block_until_tickets_satisfy(
at_least_n_payloads_received_predicate(1))
service_operation_id = self._service_mate.tickets()[0].operation_id
self._service_link.accept_ticket(
links.Ticket(
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
None, 1, None, None, None, None, None, None))
self._service_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_invocation_sequence(
self._service_mate.tickets(), _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
(invocation_payload,), invocation_initial_metadata,
invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
original_service_ticket = links.Ticket(
service_operation_id, 1, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION)
self._service_link.accept_ticket(original_service_ticket)
self._invocation_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_service_sequence(
self._invocation_mate.tickets(), (service_payload,),
service_initial_metadata, service_terminal_metadata, service_code,
service_message, links.Ticket.Termination.COMPLETION)

@ -0,0 +1,66 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State and behavior appropriate for use in tests."""
import threading
from grpc.framework.interfaces.links import links
class RecordingLink(links.Link):
"""A Link that records every ticket passed to it."""
def __init__(self):
self._condition = threading.Condition()
self._tickets = []
def accept_ticket(self, ticket):
with self._condition:
self._tickets.append(ticket)
self._condition.notify_all()
def join_link(self, link):
pass
def block_until_tickets_satisfy(self, predicate):
"""Blocks until the received tickets satisfy the given predicate.
Args:
predicate: A callable that takes a sequence of tickets and returns a
boolean value.
"""
with self._condition:
while not predicate(self._tickets):
self._condition.wait()
def tickets(self):
"""Returns a copy of the list of all tickets received by this Link."""
with self._condition:
return tuple(self._tickets)

@ -0,0 +1,44 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities provided as part of the links interface."""
from grpc.framework.interfaces.links import links
class _NullLink(links.Link):
"""A do-nothing links.Link."""
def accept_ticket(self, ticket):
pass
def join_link(self, link):
pass
NULL_LINK = _NullLink()

@ -76,6 +76,7 @@ _PACKAGES = (
'grpc', 'grpc',
'grpc._adapter', 'grpc._adapter',
'grpc._junkdrawer', 'grpc._junkdrawer',
'grpc._links',
'grpc.early_adopter', 'grpc.early_adopter',
'grpc.framework', 'grpc.framework',
'grpc.framework.alpha', 'grpc.framework.alpha',
@ -84,12 +85,15 @@ _PACKAGES = (
'grpc.framework.face', 'grpc.framework.face',
'grpc.framework.face.testing', 'grpc.framework.face.testing',
'grpc.framework.foundation', 'grpc.framework.foundation',
'grpc.framework.interfaces',
'grpc.framework.interfaces.links',
) )
_PACKAGE_DIRECTORIES = { _PACKAGE_DIRECTORIES = {
'grpc': 'grpc', 'grpc': 'grpc',
'grpc._adapter': 'grpc/_adapter', 'grpc._adapter': 'grpc/_adapter',
'grpc._junkdrawer': 'grpc/_junkdrawer', 'grpc._junkdrawer': 'grpc/_junkdrawer',
'grpc._links': 'grpc/_links',
'grpc.early_adopter': 'grpc/early_adopter', 'grpc.early_adopter': 'grpc/early_adopter',
'grpc.framework': 'grpc/framework', 'grpc.framework': 'grpc/framework',
} }

@ -47,6 +47,18 @@
"2.7" "2.7"
] ]
}, },
{
"module": "grpc._links._lonely_invocation_link_test",
"pythonVersions": [
"2.7"
]
},
{
"module": "grpc._links._transmission_test",
"pythonVersions": [
"2.7"
]
},
{ {
"module": "grpc.early_adopter.implementations_test", "module": "grpc.early_adopter.implementations_test",
"pythonVersions": [ "pythonVersions": [

Loading…
Cancel
Save