Merge pull request #668 from nathanielmanistaatgoogle/assembly

The framework.assembly package.
pull/640/head
Masood Malekghassemi 10 years ago
commit 04608c4134
  1. 3
      src/python/src/grpc/_adapter/_face_test_case.py
  2. 9
      src/python/src/grpc/_adapter/_links_test.py
  3. 50
      src/python/src/grpc/_adapter/fore.py
  4. 25
      src/python/src/grpc/_adapter/rear.py
  5. 3
      src/python/src/grpc/early_adopter/implementations.py
  6. 30
      src/python/src/grpc/framework/assembly/__init__.py
  7. 305
      src/python/src/grpc/framework/assembly/implementations.py
  8. 284
      src/python/src/grpc/framework/assembly/implementations_test.py
  9. 91
      src/python/src/grpc/framework/assembly/interfaces.py
  10. 179
      src/python/src/grpc/framework/assembly/utilities.py
  11. 40
      src/python/src/grpc/framework/common/style.py
  12. 221
      src/python/src/grpc/framework/face/utilities.py
  13. 65
      src/python/src/grpc/framework/foundation/activated.py
  14. 1
      src/python/src/setup.py
  15. 13
      tools/dockerfile/grpc_python/Dockerfile
  16. 1
      tools/run_tests/run_python.sh

@ -81,7 +81,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
fore_link = fore.ForeLink(
pool, serialization.request_deserializers,
serialization.response_serializers, None, ())
port = fore_link.start()
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, pool,
serialization.request_serializers, serialization.response_deserializers)

@ -70,7 +70,8 @@ class RoundTripTest(unittest.TestCase):
self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
port = fore_link.start()
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: None},
@ -123,7 +124,8 @@ class RoundTripTest(unittest.TestCase):
{test_method: _IDENTITY}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
port = fore_link.start()
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
@ -185,7 +187,8 @@ class RoundTripTest(unittest.TestCase):
{test_method: scenario.serialize_response}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
port = fore_link.start()
fore_link.start()
port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool,

@ -40,6 +40,7 @@ from grpc.framework.base import interfaces
from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated
@enum.unique
@ -65,7 +66,7 @@ def _status(call, rpc_state):
rpc_state.write.low = _LowWrite.CLOSED
class ForeLink(ticket_interfaces.ForeLink):
class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
"""A service-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@ -92,13 +93,14 @@ class ForeLink(ticket_interfaces.ForeLink):
self._response_serializers = response_serializers
self._root_certificates = root_certificates
self._key_chain_pairs = key_chain_pairs
self._port = port
self._requested_port = port
self._rear_link = null.NULL_REAR_LINK
self._completion_queue = None
self._server = None
self._rpc_states = {}
self._spinning = False
self._port = None
def _on_stop_event(self):
self._spinning = False
@ -264,23 +266,24 @@ class ForeLink(ticket_interfaces.ForeLink):
"""See ticket_interfaces.ForeLink.join_rear_link for specification."""
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
def start(self):
def _start(self):
"""Starts this ForeLink.
This method must be called before attempting to exchange tickets with this
object.
"""
with self._condition:
address = '[::]:%d' % (0 if self._port is None else self._port)
address = '[::]:%d' % (
0 if self._requested_port is None else self._requested_port)
self._completion_queue = _low.CompletionQueue()
if self._root_certificates is None and not self._key_chain_pairs:
self._server = _low.Server(self._completion_queue, None)
port = self._server.add_http2_addr(address)
self._port = self._server.add_http2_addr(address)
else:
server_credentials = _low.ServerCredentials(
self._root_certificates, self._key_chain_pairs)
self._server = _low.Server(self._completion_queue, server_credentials)
port = self._server.add_secure_http2_addr(address)
self._port = self._server.add_secure_http2_addr(address)
self._server.start()
self._server.service(None)
@ -288,11 +291,11 @@ class ForeLink(ticket_interfaces.ForeLink):
self._pool.submit(self._spin, self._completion_queue, self._server)
self._spinning = True
return port
return self
# TODO(nathaniel): Expose graceful-shutdown semantics in which this object
# enters a state in which it finishes ongoing RPCs but refuses new ones.
def stop(self):
def _stop(self):
"""Stops this ForeLink.
This method must be called for proper termination of this object, and no
@ -301,7 +304,7 @@ class ForeLink(ticket_interfaces.ForeLink):
"""
with self._condition:
self._server.stop()
# TODO(b/18904187): Yep, this is weird. Deleting a server shouldn't have a
# TODO(nathaniel): Yep, this is weird. Deleting a server shouldn't have a
# behaviorally significant side-effect.
self._server = None
self._completion_queue.stop()
@ -309,6 +312,35 @@ class ForeLink(ticket_interfaces.ForeLink):
while self._spinning:
self._condition.wait()
self._port = None
def __enter__(self):
"""See activated.Activated.__enter__ for specification."""
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
"""See activated.Activated.__exit__ for specification."""
self._stop()
return False
def start(self):
"""See activated.Activated.start for specification."""
return self._start()
def stop(self):
"""See activated.Activated.stop for specification."""
self._stop()
def port(self):
"""Identifies the port on which this ForeLink is servicing RPCs.
Returns:
The number of the port on which this ForeLink is servicing RPCs, or None
if this ForeLink is not currently activated and servicing RPCs.
"""
with self._condition:
return self._port
def accept_back_to_front_ticket(self, ticket):
"""See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
with self._condition:

@ -39,6 +39,7 @@ from grpc._adapter import _low
from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
@ -84,7 +85,7 @@ def _write(operation_id, call, outstanding, write_state, serialized_payload):
raise ValueError('Write attempted after writes completed!')
class RearLink(ticket_interfaces.RearLink):
class RearLink(ticket_interfaces.RearLink, activated.Activated):
"""An invocation-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@ -297,7 +298,7 @@ class RearLink(ticket_interfaces.RearLink):
with self._condition:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
def start(self):
def _start(self):
"""Starts this RearLink.
This method must be called before attempting to exchange tickets with this
@ -306,8 +307,9 @@ class RearLink(ticket_interfaces.RearLink):
with self._condition:
self._completion_queue = _low.CompletionQueue()
self._channel = _low.Channel('%s:%d' % (self._host, self._port))
return self
def stop(self):
def _stop(self):
"""Stops this RearLink.
This method must be called for proper termination of this object, and no
@ -321,6 +323,23 @@ class RearLink(ticket_interfaces.RearLink):
while self._spinning:
self._condition.wait()
def __enter__(self):
"""See activated.Activated.__enter__ for specification."""
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
"""See activated.Activated.__exit__ for specification."""
self._stop()
return False
def start(self):
"""See activated.Activated.start for specification."""
return self._start()
def stop(self):
"""See activated.Activated.stop for specification."""
self._stop()
def accept_front_to_back_ticket(self, ticket):
"""See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec."""
with self._condition:

@ -70,7 +70,8 @@ class _Server(interfaces.Server):
self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None,
((self._private_key, self._certificate_chain),), port=self._port)
port = self._fore_link.start()
self._fore_link.start()
port = self._fore_link.port()
self._back = _tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT,
_MEGA_TIMEOUT)

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,305 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Implementations for assembling RPC framework values."""
import threading
from grpc.framework.assembly import interfaces
from grpc.framework.base import util as base_utilities
from grpc.framework.base.packets import implementations as tickets_implementations
from grpc.framework.base.packets import interfaces as tickets_interfaces
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.face import implementations as face_implementations
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.face import utilities as face_utilities
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_THREAD_POOL_SIZE = 100
class _FaceStub(object):
def __init__(self, rear_link):
self._rear_link = rear_link
self._lock = threading.Lock()
self._pool = None
self._front = None
self._under_stub = None
def __enter__(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._front = tickets_implementations.front(
self._pool, self._pool, self._pool)
self._rear_link.start()
self._rear_link.join_fore_link(self._front)
self._front.join_rear_link(self._rear_link)
self._under_stub = face_implementations.stub(self._front, self._pool)
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
self._under_stub = None
self._rear_link.stop()
base_utilities.wait_for_idle(self._front)
self._front = None
self._pool.shutdown(wait=True)
self._pool = None
return False
def __getattr__(self, attr):
with self._lock:
if self._under_stub is None:
raise ValueError('Called out of context!')
else:
return getattr(self._under_stub, attr)
def _behaviors(implementations, front, pool):
behaviors = {}
stub = face_implementations.stub(front, pool)
for name, implementation in implementations.iteritems():
if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
behaviors[name] = stub.unary_unary_sync_async(name)
elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
behaviors[name] = lambda request, context, bound_name=name: (
stub.inline_value_in_stream_out(bound_name, request, context))
elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
behaviors[name] = stub.stream_unary_sync_async(name)
elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
behaviors[name] = lambda request_iterator, context, bound_name=name: (
stub.inline_stream_in_stream_out(
bound_name, request_iterator, context))
return behaviors
class _DynamicInlineStub(object):
def __init__(self, implementations, rear_link):
self._implementations = implementations
self._rear_link = rear_link
self._lock = threading.Lock()
self._pool = None
self._front = None
self._behaviors = None
def __enter__(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._front = tickets_implementations.front(
self._pool, self._pool, self._pool)
self._rear_link.start()
self._rear_link.join_fore_link(self._front)
self._front.join_rear_link(self._rear_link)
self._behaviors = _behaviors(
self._implementations, self._front, self._pool)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
self._behaviors = None
self._rear_link.stop()
base_utilities.wait_for_idle(self._front)
self._front = None
self._pool.shutdown(wait=True)
self._pool = None
return False
def __getattr__(self, attr):
with self._lock:
behavior = self._behaviors.get(attr)
if behavior is None:
raise AttributeError(attr)
else:
return behavior
def _servicer(implementations, pool):
inline_value_in_value_out_methods = {}
inline_value_in_stream_out_methods = {}
inline_stream_in_value_out_methods = {}
inline_stream_in_stream_out_methods = {}
event_value_in_value_out_methods = {}
event_value_in_stream_out_methods = {}
event_stream_in_value_out_methods = {}
event_stream_in_stream_out_methods = {}
for name, implementation in implementations.iteritems():
if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
if implementation.style is style.Service.INLINE:
inline_value_in_value_out_methods[name] = (
face_utilities.inline_unary_unary_method(implementation.unary_unary_inline))
elif implementation.style is style.Service.EVENT:
event_value_in_value_out_methods[name] = (
face_utilities.event_unary_unary_method(implementation.unary_unary_event))
elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
if implementation.style is style.Service.INLINE:
inline_value_in_stream_out_methods[name] = (
face_utilities.inline_unary_stream_method(implementation.unary_stream_inline))
elif implementation.style is style.Service.EVENT:
event_value_in_stream_out_methods[name] = (
face_utilities.event_unary_stream_method(implementation.unary_stream_event))
if implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
if implementation.style is style.Service.INLINE:
inline_stream_in_value_out_methods[name] = (
face_utilities.inline_stream_unary_method(implementation.stream_unary_inline))
elif implementation.style is style.Service.EVENT:
event_stream_in_value_out_methods[name] = (
face_utilities.event_stream_unary_method(implementation.stream_unary_event))
elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
if implementation.style is style.Service.INLINE:
inline_stream_in_stream_out_methods[name] = (
face_utilities.inline_stream_stream_method(implementation.stream_stream_inline))
elif implementation.style is style.Service.EVENT:
event_stream_in_stream_out_methods[name] = (
face_utilities.event_stream_stream_method(implementation.stream_stream_event))
return face_implementations.servicer(
pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods)
class _ServiceAssembly(activated.Activated):
def __init__(self, implementations, fore_link):
self._implementations = implementations
self._fore_link = fore_link
self._lock = threading.Lock()
self._pool = None
self._back = None
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = _servicer(self._implementations, self._pool)
self._back = tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
_ONE_DAY_IN_SECONDS)
self._fore_link.start()
self._fore_link.join_rear_link(self._back)
self._back.join_fore_link(self._fore_link)
def _stop(self):
with self._lock:
self._fore_link.stop()
base_utilities.wait_for_idle(self._back)
self._back = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
self._start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def assemble_face_stub(activated_rear_link):
"""Assembles a face_interfaces.Stub.
The returned object is a context manager and may only be used in context to
invoke RPCs.
Args:
activated_rear_link: An object that is both a tickets_interfaces.RearLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
A face_interfaces.Stub on which, in context, RPCs can be invoked.
"""
return _FaceStub(activated_rear_link)
def assemble_dynamic_inline_stub(implementations, activated_rear_link):
"""Assembles a stub with method names for attributes.
The returned object is a context manager and may only be used in context to
invoke RPCs.
The returned object, when used in context, will respond to attribute access
as follows: if the requested attribute is the name of a unary-unary RPC
method, the value of the attribute will be a
face_interfaces.UnaryUnarySyncAsync with which to invoke the RPC method. If
the requested attribute is the name of a unary-stream RPC method, the value
of the attribute will be a callable with the semantics of
face_interfaces.Stub.inline_value_in_stream_out, minus the "name" parameter,
with which to invoke the RPC method. If the requested attribute is the name
of a stream-unary RPC method, the value of the attribute will be a
face_interfaces.StreamUnarySyncAsync with which to invoke the RPC method. If
the requested attribute is the name of a stream-stream RPC method, the value
of the attribute will be a callable with the semantics of
face_interfaces.Stub.inline_stream_in_stream_out, minus the "name" parameter,
with which to invoke the RPC method.
Args:
implementations: A dictionary from RPC method name to
interfaces.MethodImplementation.
activated_rear_link: An object that is both a tickets_interfaces.RearLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
A stub on which, in context, RPCs can be invoked.
"""
return _DynamicInlineStub(implementations, activated_rear_link)
def assemble_service(implementations, activated_fore_link):
"""Assembles the service-side of the RPC Framework stack.
Args:
implementations: A dictionary from RPC method name to
interfaces.MethodImplementation.
activated_fore_link: An object that is both a tickets_interfaces.ForeLink
and an activated.Activated. The object should be in the inactive state
when passed to this method.
Returns:
An activated.Activated value encapsulating RPC service.
"""
return _ServiceAssembly(implementations, activated_fore_link)

@ -0,0 +1,284 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): Expand this test coverage.
"""Test of the GRPC-backed ForeLink and RearLink."""
import threading
import unittest
from grpc.framework.assembly import implementations
from grpc.framework.assembly import utilities
from grpc.framework.base import interfaces
from grpc.framework.base.packets import packets as tickets
from grpc.framework.base.packets import interfaces as tickets_interfaces
from grpc.framework.base.packets import null
from grpc.framework.foundation import logging_pool
from grpc._junkdrawer import math_pb2
DIV = 'Div'
DIV_MANY = 'DivMany'
FIB = 'Fib'
SUM = 'Sum'
def _fibbonacci(limit):
left, right = 0, 1
for _ in xrange(limit):
yield left
left, right = right, left + right
def _div(request, unused_context):
return math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _div_many(request_iterator, unused_context):
for request in request_iterator:
yield math_pb2.DivReply(
quotient=request.dividend / request.divisor,
remainder=request.dividend % request.divisor)
def _fib(request, unused_context):
for number in _fibbonacci(request.limit):
yield math_pb2.Num(num=number)
def _sum(request_iterator, unused_context):
accumulation = 0
for request in request_iterator:
accumulation += request.num
return math_pb2.Num(num=accumulation)
_IMPLEMENTATIONS = {
DIV: utilities.unary_unary_inline(_div),
DIV_MANY: utilities.stream_stream_inline(_div_many),
FIB: utilities.unary_stream_inline(_fib),
SUM: utilities.stream_unary_inline(_sum),
}
_TIMEOUT = 10
class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink):
def __init__(self):
self._fore_lock = threading.Lock()
self._fore_link = null.NULL_FORE_LINK
self._rear_lock = threading.Lock()
self._rear_link = null.NULL_REAR_LINK
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
def start(self):
pass
def stop(self):
pass
def accept_back_to_front_ticket(self, ticket):
with self._fore_lock:
self._fore_link.accept_back_to_front_ticket(ticket)
def join_rear_link(self, rear_link):
with self._rear_lock:
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
def accept_front_to_back_ticket(self, ticket):
with self._rear_lock:
self._rear_link.accept_front_to_back_ticket(ticket)
def join_fore_link(self, fore_link):
with self._fore_lock:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
class FaceStubTest(unittest.TestCase):
def testUnaryUnary(self):
divisor = 7
dividend = 13
expected_quotient = 1
expected_remainder = 6
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
service.start()
try:
with face_stub:
response = face_stub.blocking_value_in_value_out(
DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend),
_TIMEOUT)
self.assertEqual(expected_quotient, response.quotient)
self.assertEqual(expected_remainder, response.remainder)
finally:
service.stop()
def testUnaryStream(self):
stream_length = 29
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
responses = list(
face_stub.inline_value_in_stream_out(
FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT))
numbers = [response.num for response in responses]
for early, middle, later in zip(numbers, numbers[1:], numbers[2:]):
self.assertEqual(early + middle, later)
def testStreamUnary(self):
stream_length = 13
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
sync_async = face_stub.stream_unary_sync_async(SUM)
response_future = sync_async.async(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
(stream_length * (stream_length - 1)) / 2,
response_future.result().num)
def testStreamStream(self):
stream_length = 17
divisor_offset = 7
dividend_offset = 17
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
face_stub = implementations.assemble_face_stub(pipe)
with service, face_stub:
response_iterator = face_stub.inline_stream_in_stream_out(
DIV_MANY,
(math_pb2.DivArgs(
divisor=divisor_offset + index,
dividend=dividend_offset + index)
for index in range(stream_length)),
_TIMEOUT)
for index, response in enumerate(response_iterator):
self.assertEqual(
(dividend_offset + index) / (divisor_offset + index),
response.quotient)
self.assertEqual(
(dividend_offset + index) % (divisor_offset + index),
response.remainder)
self.assertEqual(stream_length, index + 1)
class DynamicInlineStubTest(unittest.TestCase):
def testUnaryUnary(self):
divisor = 59
dividend = 973
expected_quotient = dividend / divisor
expected_remainder = dividend % divisor
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
service.start()
with dynamic_stub:
response = dynamic_stub.Div(
math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
self.assertEqual(expected_quotient, response.quotient)
self.assertEqual(expected_remainder, response.remainder)
service.stop()
def testUnaryStream(self):
stream_length = 43
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
with service, dynamic_stub:
response_iterator = dynamic_stub.Fib(
math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
numbers = tuple(response.num for response in response_iterator)
for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
self.assertEqual(early + middle, later)
self.assertEqual(stream_length, len(numbers))
def testStreamUnary(self):
stream_length = 127
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
with service, dynamic_stub:
response_future = dynamic_stub.Sum.async(
(math_pb2.Num(num=index) for index in range(stream_length)),
_TIMEOUT)
self.assertEqual(
(stream_length * (stream_length - 1)) / 2,
response_future.result().num)
def testStreamStream(self):
stream_length = 179
divisor_offset = 71
dividend_offset = 1763
pipe = PipeLink()
service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
dynamic_stub = implementations.assemble_dynamic_inline_stub(
_IMPLEMENTATIONS, pipe)
with service, dynamic_stub:
response_iterator = dynamic_stub.DivMany(
(math_pb2.DivArgs(
divisor=divisor_offset + index,
dividend=dividend_offset + index)
for index in range(stream_length)),
_TIMEOUT)
for index, response in enumerate(response_iterator):
self.assertEqual(
(dividend_offset + index) / (divisor_offset + index),
response.quotient)
self.assertEqual(
(dividend_offset + index) % (divisor_offset + index),
response.remainder)
self.assertEqual(stream_length, index + 1)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,91 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in
# the face layer. The two should be squashed together as soon as manageable.
"""Interfaces for assembling RPC Framework values."""
import abc
# cardinality, style, and stream are referenced from specification in this
# module.
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.common import style # pylint: disable=unused-import
from grpc.framework.foundation import stream # pylint: disable=unused-import
class MethodImplementation(object):
"""A sum type that describes an RPC method implementation.
Attributes:
cardinality: A cardinality.Cardinality value.
style: A style.Service value.
unary_unary_inline: The implementation of the RPC method as a callable
value that takes a request value and a face_interfaces.RpcContext object
and returns a response value. Only non-None if cardinality is
cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
unary_stream_inline: The implementation of the RPC method as a callable
value that takes a request value and a face_interfaces.RpcContext object
and returns an iterator of response values. Only non-None if cardinality
is cardinality.Cardinality.UNARY_STREAM and style is
style.Service.INLINE.
stream_unary_inline: The implementation of the RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns a response value. Only
non-None if cardinality is cardinality.Cardinality.STREAM_UNARY and style
is style.Service.INLINE.
stream_stream_inline: The implementation of the RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns an iterator of response
values. Only non-None if cardinality is
cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
unary_unary_event: The implementation of the RPC method as a callable value
that takes a request value, a response callback to which to pass the
response value of the RPC, and a face_interfaces.RpcContext. Only
non-None if cardinality is cardinality.Cardinality.UNARY_UNARY and style
is style.Service.EVENT.
unary_stream_event: The implementation of the RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
the response values of the RPC, and a face_interfaces.RpcContext. Only
non-None if cardinality is cardinality.Cardinality.UNARY_STREAM and style
is style.Service.EVENT.
stream_unary_event: The implementation of the RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed. Only non-None if
cardinality is cardinality.Cardinality.STREAM_UNARY and style is
style.Service.EVENT.
stream_stream_event: The implementation of the RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed. Only non-None if
cardinality is cardinality.Cardinality.STREAM_STREAM and style is
style.Service.EVENT.
"""
__metaclass__ = abc.ABCMeta

@ -0,0 +1,179 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for assembling RPC framework values."""
import collections
from grpc.framework.assembly import interfaces
from grpc.framework.common import cardinality
from grpc.framework.common import style
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.foundation import stream
class _MethodImplementation(
interfaces.MethodImplementation,
collections.namedtuple(
'_MethodImplementation',
['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
pass
def unary_unary_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable value
that takes a request value and a face_interfaces.RpcContext object and
returns a response value.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
None, None, None, None, None, None, None)
def unary_stream_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value and a face_interfaces.RpcContext object
and returns an iterator of response values.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
behavior, None, None, None, None, None, None)
def stream_unary_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns a response value.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
behavior, None, None, None, None, None)
def stream_stream_inline(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes an iterator of request values and a
face_interfaces.RpcContext object and returns an iterator of response
values.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
None, behavior, None, None, None, None)
def unary_unary_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
value that takes a request value, a response callback to which to pass
the response value of the RPC, and a face_interfaces.RpcContext.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
None, None, behavior, None, None, None)
def unary_stream_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
the response values of the RPC, and a face_interfaces.RpcContext.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
None, None, None, behavior, None, None)
def stream_unary_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
None, None, None, None, behavior, None)
def stream_stream_event(behavior):
"""Creates an interfaces.MethodImplementation for the given behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
to which the request values of the RPC should be passed.
Returns:
An interfaces.MethodImplementation derived from the given behavior.
"""
return _MethodImplementation(
cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
None, None, None, None, None, behavior)

@ -0,0 +1,40 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Defines an enum for classifying RPC methods by control flow semantics."""
import enum
@enum.unique
class Service(enum.Enum):
"""Describes the control flow style of RPC method implementation."""
INLINE = 'inline'
EVENT = 'event'

@ -0,0 +1,221 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for the face layer of RPC Framework."""
# stream is referenced from specification in this module.
from grpc.framework.face import interfaces
from grpc.framework.foundation import stream # pylint: disable=unused-import
class _InlineUnaryUnaryMethod(interfaces.InlineValueInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, context):
return self._behavior(request, context)
class _InlineUnaryStreamMethod(interfaces.InlineValueInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, context):
return self._behavior(request, context)
class _InlineStreamUnaryMethod(interfaces.InlineStreamInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request_iterator, context):
return self._behavior(request_iterator, context)
class _InlineStreamStreamMethod(interfaces.InlineStreamInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request_iterator, context):
return self._behavior(request_iterator, context)
class _EventUnaryUnaryMethod(interfaces.EventValueInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, response_callback, context):
return self._behavior(request, response_callback, context)
class _EventUnaryStreamMethod(interfaces.EventValueInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, request, response_consumer, context):
return self._behavior(request, response_consumer, context)
class _EventStreamUnaryMethod(interfaces.EventStreamInValueOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, response_callback, context):
return self._behavior(response_callback, context)
class _EventStreamStreamMethod(interfaces.EventStreamInStreamOutMethod):
def __init__(self, behavior):
self._behavior = behavior
def service(self, response_consumer, context):
return self._behavior(response_consumer, context)
def inline_unary_unary_method(behavior):
"""Creates an interfaces.InlineValueInValueOutMethod from a behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
value that takes a request value and an interfaces.RpcContext object and
returns a response value.
Returns:
An interfaces.InlineValueInValueOutMethod derived from the given behavior.
"""
return _InlineUnaryUnaryMethod(behavior)
def inline_unary_stream_method(behavior):
"""Creates an interfaces.InlineValueInStreamOutMethod from a behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value and an interfaces.RpcContext object and
returns an iterator of response values.
Returns:
An interfaces.InlineValueInStreamOutMethod derived from the given behavior.
"""
return _InlineUnaryStreamMethod(behavior)
def inline_stream_unary_method(behavior):
"""Creates an interfaces.InlineStreamInValueOutMethod from a behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes an iterator of request values and an
interfaces.RpcContext object and returns a response value.
Returns:
An interfaces.InlineStreamInValueOutMethod derived from the given behavior.
"""
return _InlineStreamUnaryMethod(behavior)
def inline_stream_stream_method(behavior):
"""Creates an interfaces.InlineStreamInStreamOutMethod from a behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes an iterator of request values and an
interfaces.RpcContext object and returns an iterator of response values.
Returns:
An interfaces.InlineStreamInStreamOutMethod derived from the given
behavior.
"""
return _InlineStreamStreamMethod(behavior)
def event_unary_unary_method(behavior):
"""Creates an interfaces.EventValueInValueOutMethod from a behavior.
Args:
behavior: The implementation of a unary-unary RPC method as a callable
value that takes a request value, a response callback to which to pass
the response value of the RPC, and an interfaces.RpcContext.
Returns:
An interfaces.EventValueInValueOutMethod derived from the given behavior.
"""
return _EventUnaryUnaryMethod(behavior)
def event_unary_stream_method(behavior):
"""Creates an interfaces.EventValueInStreamOutMethod from a behavior.
Args:
behavior: The implementation of a unary-stream RPC method as a callable
value that takes a request value, a stream.Consumer to which to pass the
response values of the RPC, and an interfaces.RpcContext.
Returns:
An interfaces.EventValueInStreamOutMethod derived from the given behavior.
"""
return _EventUnaryStreamMethod(behavior)
def event_stream_unary_method(behavior):
"""Creates an interfaces.EventStreamInValueOutMethod from a behavior.
Args:
behavior: The implementation of a stream-unary RPC method as a callable
value that takes a response callback to which to pass the response value
of the RPC and an interfaces.RpcContext and returns a stream.Consumer to
which the request values of the RPC should be passed.
Returns:
An interfaces.EventStreamInValueOutMethod derived from the given behavior.
"""
return _EventStreamUnaryMethod(behavior)
def event_stream_stream_method(behavior):
"""Creates an interfaces.EventStreamInStreamOutMethod from a behavior.
Args:
behavior: The implementation of a stream-stream RPC method as a callable
value that takes a stream.Consumer to which to pass the response values
of the RPC and an interfaces.RpcContext and returns a stream.Consumer to
which the request values of the RPC should be passed.
Returns:
An interfaces.EventStreamInStreamOutMethod derived from the given behavior.
"""
return _EventStreamStreamMethod(behavior)

@ -0,0 +1,65 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces related to streams of values or objects."""
import abc
class Activated(object):
"""Interface for objects that may be started and stopped.
Values implementing this type must also implement the context manager
protocol.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __enter__(self):
"""See the context manager protocol for specification."""
raise NotImplementedError()
@abc.abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
"""See the context manager protocol for specification."""
raise NotImplementedError()
@abc.abstractmethod
def start(self):
"""Activates this object.
Returns:
A value equal to the value returned by this object's __enter__ method.
"""
raise NotImplementedError()
@abc.abstractmethod
def stop(self):
"""Deactivates this object."""
raise NotImplementedError()

@ -62,6 +62,7 @@ _PACKAGES=(
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
'grpc.framework.assembly',
'grpc.framework.base',
'grpc.framework.base.packets',
'grpc.framework.common',

@ -24,12 +24,13 @@ RUN cd /var/local/git/grpc \
&& python2.7 -B -m grpc._adapter._links_test
&& python2.7 -B -m grpc._adapter._lonely_rear_link_test
&& python2.7 -B -m grpc._adapter._low_test
&& python2.7 -B -m grpc._framework.base.packets.implementations_test
&& python2.7 -B -m grpc._framework.face.blocking_invocation_inline_service_test
&& python2.7 -B -m grpc._framework.face.event_invocation_synchronous_event_service_test
&& python2.7 -B -m grpc._framework.face.future_invocation_asynchronous_event_service_test
&& python2.7 -B -m grpc._framework.foundation._later_test
&& python2.7 -B -m grpc._framework.foundation._logging_pool_test
&& python2.7 -B -m grpc.framework.assembly.implementations_test
&& python2.7 -B -m grpc.framework.base.packets.implementations_test
&& python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test
&& python2.7 -B -m grpc.framework.face.event_invocation_synchronous_event_service_test
&& python2.7 -B -m grpc.framework.face.future_invocation_asynchronous_event_service_test
&& python2.7 -B -m grpc.framework.foundation._later_test
&& python2.7 -B -m grpc.framework.foundation._logging_pool_test
# Add a cacerts directory containing the Google root pem file, allowing the interop client to access the production test instance
ADD cacerts cacerts

@ -44,6 +44,7 @@ python2.7 -B -m grpc._adapter._future_invocation_asynchronous_event_service_test
python2.7 -B -m grpc._adapter._links_test
python2.7 -B -m grpc._adapter._lonely_rear_link_test
python2.7 -B -m grpc._adapter._low_test
python2.7 -B -m grpc.framework.assembly.implementations_test
python2.7 -B -m grpc.framework.base.packets.implementations_test
python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test
python2.7 -B -m grpc.framework.face.event_invocation_synchronous_event_service_test

Loading…
Cancel
Save