diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py index da73366f446..8cce322d300 100644 --- a/src/python/src/grpc/_adapter/_face_test_case.py +++ b/src/python/src/grpc/_adapter/_face_test_case.py @@ -81,7 +81,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): fore_link = fore.ForeLink( pool, serialization.request_deserializers, serialization.response_serializers, None, ()) - port = fore_link.start() + fore_link.start() + port = fore_link.port() rear_link = rear.RearLink( 'localhost', port, pool, serialization.request_serializers, serialization.response_deserializers) diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index ba7660bb920..6b3bcee9fa8 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -70,7 +70,8 @@ class RoundTripTest(unittest.TestCase): self.fore_link_pool, {test_method: None}, {test_method: None}, None, ()) fore_link.join_rear_link(test_rear_link) test_rear_link.join_fore_link(fore_link) - port = fore_link.start() + fore_link.start() + port = fore_link.port() rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, {test_method: None}, @@ -123,7 +124,8 @@ class RoundTripTest(unittest.TestCase): {test_method: _IDENTITY}, None, ()) fore_link.join_rear_link(test_rear_link) test_rear_link.join_fore_link(fore_link) - port = fore_link.start() + fore_link.start() + port = fore_link.port() rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, {test_method: _IDENTITY}, @@ -185,7 +187,8 @@ class RoundTripTest(unittest.TestCase): {test_method: scenario.serialize_response}, None, ()) fore_link.join_rear_link(test_rear_link) test_rear_link.join_fore_link(fore_link) - port = fore_link.start() + fore_link.start() + port = fore_link.port() rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index f72b2fd5a5f..051fc083f19 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -40,6 +40,7 @@ from grpc.framework.base import interfaces from grpc.framework.base.packets import interfaces as ticket_interfaces from grpc.framework.base.packets import null from grpc.framework.base.packets import packets as tickets +from grpc.framework.foundation import activated @enum.unique @@ -65,7 +66,7 @@ def _status(call, rpc_state): rpc_state.write.low = _LowWrite.CLOSED -class ForeLink(ticket_interfaces.ForeLink): +class ForeLink(ticket_interfaces.ForeLink, activated.Activated): """A service-side bridge between RPC Framework and the C-ish _low code.""" def __init__( @@ -92,13 +93,14 @@ class ForeLink(ticket_interfaces.ForeLink): self._response_serializers = response_serializers self._root_certificates = root_certificates self._key_chain_pairs = key_chain_pairs - self._port = port + self._requested_port = port self._rear_link = null.NULL_REAR_LINK self._completion_queue = None self._server = None self._rpc_states = {} self._spinning = False + self._port = None def _on_stop_event(self): self._spinning = False @@ -264,23 +266,24 @@ class ForeLink(ticket_interfaces.ForeLink): """See ticket_interfaces.ForeLink.join_rear_link for specification.""" self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link - def start(self): + def _start(self): """Starts this ForeLink. This method must be called before attempting to exchange tickets with this object. """ with self._condition: - address = '[::]:%d' % (0 if self._port is None else self._port) + address = '[::]:%d' % ( + 0 if self._requested_port is None else self._requested_port) self._completion_queue = _low.CompletionQueue() if self._root_certificates is None and not self._key_chain_pairs: self._server = _low.Server(self._completion_queue, None) - port = self._server.add_http2_addr(address) + self._port = self._server.add_http2_addr(address) else: server_credentials = _low.ServerCredentials( self._root_certificates, self._key_chain_pairs) self._server = _low.Server(self._completion_queue, server_credentials) - port = self._server.add_secure_http2_addr(address) + self._port = self._server.add_secure_http2_addr(address) self._server.start() self._server.service(None) @@ -288,11 +291,11 @@ class ForeLink(ticket_interfaces.ForeLink): self._pool.submit(self._spin, self._completion_queue, self._server) self._spinning = True - return port + return self # TODO(nathaniel): Expose graceful-shutdown semantics in which this object # enters a state in which it finishes ongoing RPCs but refuses new ones. - def stop(self): + def _stop(self): """Stops this ForeLink. This method must be called for proper termination of this object, and no @@ -301,7 +304,7 @@ class ForeLink(ticket_interfaces.ForeLink): """ with self._condition: self._server.stop() - # TODO(b/18904187): Yep, this is weird. Deleting a server shouldn't have a + # TODO(nathaniel): Yep, this is weird. Deleting a server shouldn't have a # behaviorally significant side-effect. self._server = None self._completion_queue.stop() @@ -309,6 +312,35 @@ class ForeLink(ticket_interfaces.ForeLink): while self._spinning: self._condition.wait() + self._port = None + + def __enter__(self): + """See activated.Activated.__enter__ for specification.""" + return self._start() + + def __exit__(self, exc_type, exc_val, exc_tb): + """See activated.Activated.__exit__ for specification.""" + self._stop() + return False + + def start(self): + """See activated.Activated.start for specification.""" + return self._start() + + def stop(self): + """See activated.Activated.stop for specification.""" + self._stop() + + def port(self): + """Identifies the port on which this ForeLink is servicing RPCs. + + Returns: + The number of the port on which this ForeLink is servicing RPCs, or None + if this ForeLink is not currently activated and servicing RPCs. + """ + with self._condition: + return self._port + def accept_back_to_front_ticket(self, ticket): """See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec.""" with self._condition: diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index c47c0aa0209..cbcf121d9ab 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -39,6 +39,7 @@ from grpc._adapter import _low from grpc.framework.base.packets import interfaces as ticket_interfaces from grpc.framework.base.packets import null from grpc.framework.base.packets import packets as tickets +from grpc.framework.foundation import activated _INVOCATION_EVENT_KINDS = ( _low.Event.Kind.METADATA_ACCEPTED, @@ -84,7 +85,7 @@ def _write(operation_id, call, outstanding, write_state, serialized_payload): raise ValueError('Write attempted after writes completed!') -class RearLink(ticket_interfaces.RearLink): +class RearLink(ticket_interfaces.RearLink, activated.Activated): """An invocation-side bridge between RPC Framework and the C-ish _low code.""" def __init__( @@ -297,7 +298,7 @@ class RearLink(ticket_interfaces.RearLink): with self._condition: self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link - def start(self): + def _start(self): """Starts this RearLink. This method must be called before attempting to exchange tickets with this @@ -306,8 +307,9 @@ class RearLink(ticket_interfaces.RearLink): with self._condition: self._completion_queue = _low.CompletionQueue() self._channel = _low.Channel('%s:%d' % (self._host, self._port)) + return self - def stop(self): + def _stop(self): """Stops this RearLink. This method must be called for proper termination of this object, and no @@ -321,6 +323,23 @@ class RearLink(ticket_interfaces.RearLink): while self._spinning: self._condition.wait() + def __enter__(self): + """See activated.Activated.__enter__ for specification.""" + return self._start() + + def __exit__(self, exc_type, exc_val, exc_tb): + """See activated.Activated.__exit__ for specification.""" + self._stop() + return False + + def start(self): + """See activated.Activated.start for specification.""" + return self._start() + + def stop(self): + """See activated.Activated.stop for specification.""" + self._stop() + def accept_front_to_back_ticket(self, ticket): """See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec.""" with self._condition: diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index c549317d15d..1d76d0f9e0d 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -70,7 +70,8 @@ class _Server(interfaces.Server): self._pool, self._breakdown.request_deserializers, self._breakdown.response_serializers, None, ((self._private_key, self._certificate_chain),), port=self._port) - port = self._fore_link.start() + self._fore_link.start() + port = self._fore_link.port() self._back = _tickets_implementations.back( servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT, _MEGA_TIMEOUT) diff --git a/src/python/src/grpc/framework/assembly/__init__.py b/src/python/src/grpc/framework/assembly/__init__.py new file mode 100644 index 00000000000..70865191060 --- /dev/null +++ b/src/python/src/grpc/framework/assembly/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py new file mode 100644 index 00000000000..461aa9c8554 --- /dev/null +++ b/src/python/src/grpc/framework/assembly/implementations.py @@ -0,0 +1,305 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementations for assembling RPC framework values.""" + +import threading + +from grpc.framework.assembly import interfaces +from grpc.framework.base import util as base_utilities +from grpc.framework.base.packets import implementations as tickets_implementations +from grpc.framework.base.packets import interfaces as tickets_interfaces +from grpc.framework.common import cardinality +from grpc.framework.common import style +from grpc.framework.face import implementations as face_implementations +from grpc.framework.face import interfaces as face_interfaces +from grpc.framework.face import utilities as face_utilities +from grpc.framework.foundation import activated +from grpc.framework.foundation import logging_pool + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +_THREAD_POOL_SIZE = 100 + + +class _FaceStub(object): + + def __init__(self, rear_link): + self._rear_link = rear_link + self._lock = threading.Lock() + self._pool = None + self._front = None + self._under_stub = None + + def __enter__(self): + with self._lock: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._front = tickets_implementations.front( + self._pool, self._pool, self._pool) + self._rear_link.start() + self._rear_link.join_fore_link(self._front) + self._front.join_rear_link(self._rear_link) + self._under_stub = face_implementations.stub(self._front, self._pool) + + def __exit__(self, exc_type, exc_val, exc_tb): + with self._lock: + self._under_stub = None + self._rear_link.stop() + base_utilities.wait_for_idle(self._front) + self._front = None + self._pool.shutdown(wait=True) + self._pool = None + return False + + def __getattr__(self, attr): + with self._lock: + if self._under_stub is None: + raise ValueError('Called out of context!') + else: + return getattr(self._under_stub, attr) + + +def _behaviors(implementations, front, pool): + behaviors = {} + stub = face_implementations.stub(front, pool) + for name, implementation in implementations.iteritems(): + if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: + behaviors[name] = stub.unary_unary_sync_async(name) + elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: + behaviors[name] = lambda request, context, bound_name=name: ( + stub.inline_value_in_stream_out(bound_name, request, context)) + elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: + behaviors[name] = stub.stream_unary_sync_async(name) + elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: + behaviors[name] = lambda request_iterator, context, bound_name=name: ( + stub.inline_stream_in_stream_out( + bound_name, request_iterator, context)) + return behaviors + + +class _DynamicInlineStub(object): + + def __init__(self, implementations, rear_link): + self._implementations = implementations + self._rear_link = rear_link + self._lock = threading.Lock() + self._pool = None + self._front = None + self._behaviors = None + + def __enter__(self): + with self._lock: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._front = tickets_implementations.front( + self._pool, self._pool, self._pool) + self._rear_link.start() + self._rear_link.join_fore_link(self._front) + self._front.join_rear_link(self._rear_link) + self._behaviors = _behaviors( + self._implementations, self._front, self._pool) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + with self._lock: + self._behaviors = None + self._rear_link.stop() + base_utilities.wait_for_idle(self._front) + self._front = None + self._pool.shutdown(wait=True) + self._pool = None + return False + + def __getattr__(self, attr): + with self._lock: + behavior = self._behaviors.get(attr) + if behavior is None: + raise AttributeError(attr) + else: + return behavior + + +def _servicer(implementations, pool): + inline_value_in_value_out_methods = {} + inline_value_in_stream_out_methods = {} + inline_stream_in_value_out_methods = {} + inline_stream_in_stream_out_methods = {} + event_value_in_value_out_methods = {} + event_value_in_stream_out_methods = {} + event_stream_in_value_out_methods = {} + event_stream_in_stream_out_methods = {} + + for name, implementation in implementations.iteritems(): + if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: + if implementation.style is style.Service.INLINE: + inline_value_in_value_out_methods[name] = ( + face_utilities.inline_unary_unary_method(implementation.unary_unary_inline)) + elif implementation.style is style.Service.EVENT: + event_value_in_value_out_methods[name] = ( + face_utilities.event_unary_unary_method(implementation.unary_unary_event)) + elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: + if implementation.style is style.Service.INLINE: + inline_value_in_stream_out_methods[name] = ( + face_utilities.inline_unary_stream_method(implementation.unary_stream_inline)) + elif implementation.style is style.Service.EVENT: + event_value_in_stream_out_methods[name] = ( + face_utilities.event_unary_stream_method(implementation.unary_stream_event)) + if implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: + if implementation.style is style.Service.INLINE: + inline_stream_in_value_out_methods[name] = ( + face_utilities.inline_stream_unary_method(implementation.stream_unary_inline)) + elif implementation.style is style.Service.EVENT: + event_stream_in_value_out_methods[name] = ( + face_utilities.event_stream_unary_method(implementation.stream_unary_event)) + elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: + if implementation.style is style.Service.INLINE: + inline_stream_in_stream_out_methods[name] = ( + face_utilities.inline_stream_stream_method(implementation.stream_stream_inline)) + elif implementation.style is style.Service.EVENT: + event_stream_in_stream_out_methods[name] = ( + face_utilities.event_stream_stream_method(implementation.stream_stream_event)) + + return face_implementations.servicer( + pool, + inline_value_in_value_out_methods=inline_value_in_value_out_methods, + inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, + event_value_in_value_out_methods=event_value_in_value_out_methods, + event_value_in_stream_out_methods=event_value_in_stream_out_methods, + event_stream_in_value_out_methods=event_stream_in_value_out_methods, + event_stream_in_stream_out_methods=event_stream_in_stream_out_methods) + + +class _ServiceAssembly(activated.Activated): + + def __init__(self, implementations, fore_link): + self._implementations = implementations + self._fore_link = fore_link + self._lock = threading.Lock() + self._pool = None + self._back = None + + def _start(self): + with self._lock: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + servicer = _servicer(self._implementations, self._pool) + self._back = tickets_implementations.back( + servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, + _ONE_DAY_IN_SECONDS) + self._fore_link.start() + self._fore_link.join_rear_link(self._back) + self._back.join_fore_link(self._fore_link) + + def _stop(self): + with self._lock: + self._fore_link.stop() + base_utilities.wait_for_idle(self._back) + self._back = None + self._pool.shutdown(wait=True) + self._pool = None + + def __enter__(self): + self._start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop() + return False + + def start(self): + return self._start() + + def stop(self): + self._stop() + + +def assemble_face_stub(activated_rear_link): + """Assembles a face_interfaces.Stub. + + The returned object is a context manager and may only be used in context to + invoke RPCs. + + Args: + activated_rear_link: An object that is both a tickets_interfaces.RearLink + and an activated.Activated. The object should be in the inactive state + when passed to this method. + + Returns: + A face_interfaces.Stub on which, in context, RPCs can be invoked. + """ + return _FaceStub(activated_rear_link) + + +def assemble_dynamic_inline_stub(implementations, activated_rear_link): + """Assembles a stub with method names for attributes. + + The returned object is a context manager and may only be used in context to + invoke RPCs. + + The returned object, when used in context, will respond to attribute access + as follows: if the requested attribute is the name of a unary-unary RPC + method, the value of the attribute will be a + face_interfaces.UnaryUnarySyncAsync with which to invoke the RPC method. If + the requested attribute is the name of a unary-stream RPC method, the value + of the attribute will be a callable with the semantics of + face_interfaces.Stub.inline_value_in_stream_out, minus the "name" parameter, + with which to invoke the RPC method. If the requested attribute is the name + of a stream-unary RPC method, the value of the attribute will be a + face_interfaces.StreamUnarySyncAsync with which to invoke the RPC method. If + the requested attribute is the name of a stream-stream RPC method, the value + of the attribute will be a callable with the semantics of + face_interfaces.Stub.inline_stream_in_stream_out, minus the "name" parameter, + with which to invoke the RPC method. + + Args: + implementations: A dictionary from RPC method name to + interfaces.MethodImplementation. + activated_rear_link: An object that is both a tickets_interfaces.RearLink + and an activated.Activated. The object should be in the inactive state + when passed to this method. + + Returns: + A stub on which, in context, RPCs can be invoked. + """ + return _DynamicInlineStub(implementations, activated_rear_link) + + +def assemble_service(implementations, activated_fore_link): + """Assembles the service-side of the RPC Framework stack. + + Args: + implementations: A dictionary from RPC method name to + interfaces.MethodImplementation. + activated_fore_link: An object that is both a tickets_interfaces.ForeLink + and an activated.Activated. The object should be in the inactive state + when passed to this method. + + Returns: + An activated.Activated value encapsulating RPC service. + """ + return _ServiceAssembly(implementations, activated_fore_link) diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py new file mode 100644 index 00000000000..74dc02ed83d --- /dev/null +++ b/src/python/src/grpc/framework/assembly/implementations_test.py @@ -0,0 +1,284 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# TODO(nathaniel): Expand this test coverage. + +"""Test of the GRPC-backed ForeLink and RearLink.""" + +import threading +import unittest + +from grpc.framework.assembly import implementations +from grpc.framework.assembly import utilities +from grpc.framework.base import interfaces +from grpc.framework.base.packets import packets as tickets +from grpc.framework.base.packets import interfaces as tickets_interfaces +from grpc.framework.base.packets import null +from grpc.framework.foundation import logging_pool +from grpc._junkdrawer import math_pb2 + +DIV = 'Div' +DIV_MANY = 'DivMany' +FIB = 'Fib' +SUM = 'Sum' + +def _fibbonacci(limit): + left, right = 0, 1 + for _ in xrange(limit): + yield left + left, right = right, left + right + + +def _div(request, unused_context): + return math_pb2.DivReply( + quotient=request.dividend / request.divisor, + remainder=request.dividend % request.divisor) + + +def _div_many(request_iterator, unused_context): + for request in request_iterator: + yield math_pb2.DivReply( + quotient=request.dividend / request.divisor, + remainder=request.dividend % request.divisor) + + +def _fib(request, unused_context): + for number in _fibbonacci(request.limit): + yield math_pb2.Num(num=number) + + +def _sum(request_iterator, unused_context): + accumulation = 0 + for request in request_iterator: + accumulation += request.num + return math_pb2.Num(num=accumulation) + + +_IMPLEMENTATIONS = { + DIV: utilities.unary_unary_inline(_div), + DIV_MANY: utilities.stream_stream_inline(_div_many), + FIB: utilities.unary_stream_inline(_fib), + SUM: utilities.stream_unary_inline(_sum), +} + +_TIMEOUT = 10 + + +class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink): + + def __init__(self): + self._fore_lock = threading.Lock() + self._fore_link = null.NULL_FORE_LINK + self._rear_lock = threading.Lock() + self._rear_link = null.NULL_REAR_LINK + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + def start(self): + pass + + def stop(self): + pass + + def accept_back_to_front_ticket(self, ticket): + with self._fore_lock: + self._fore_link.accept_back_to_front_ticket(ticket) + + def join_rear_link(self, rear_link): + with self._rear_lock: + self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link + + def accept_front_to_back_ticket(self, ticket): + with self._rear_lock: + self._rear_link.accept_front_to_back_ticket(ticket) + + def join_fore_link(self, fore_link): + with self._fore_lock: + self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link + + +class FaceStubTest(unittest.TestCase): + + def testUnaryUnary(self): + divisor = 7 + dividend = 13 + expected_quotient = 1 + expected_remainder = 6 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + face_stub = implementations.assemble_face_stub(pipe) + + service.start() + try: + with face_stub: + response = face_stub.blocking_value_in_value_out( + DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend), + _TIMEOUT) + self.assertEqual(expected_quotient, response.quotient) + self.assertEqual(expected_remainder, response.remainder) + finally: + service.stop() + + def testUnaryStream(self): + stream_length = 29 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + face_stub = implementations.assemble_face_stub(pipe) + + with service, face_stub: + responses = list( + face_stub.inline_value_in_stream_out( + FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT)) + numbers = [response.num for response in responses] + for early, middle, later in zip(numbers, numbers[1:], numbers[2:]): + self.assertEqual(early + middle, later) + + def testStreamUnary(self): + stream_length = 13 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + face_stub = implementations.assemble_face_stub(pipe) + + with service, face_stub: + sync_async = face_stub.stream_unary_sync_async(SUM) + response_future = sync_async.async( + (math_pb2.Num(num=index) for index in range(stream_length)), + _TIMEOUT) + self.assertEqual( + (stream_length * (stream_length - 1)) / 2, + response_future.result().num) + + def testStreamStream(self): + stream_length = 17 + divisor_offset = 7 + dividend_offset = 17 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + face_stub = implementations.assemble_face_stub(pipe) + + with service, face_stub: + response_iterator = face_stub.inline_stream_in_stream_out( + DIV_MANY, + (math_pb2.DivArgs( + divisor=divisor_offset + index, + dividend=dividend_offset + index) + for index in range(stream_length)), + _TIMEOUT) + for index, response in enumerate(response_iterator): + self.assertEqual( + (dividend_offset + index) / (divisor_offset + index), + response.quotient) + self.assertEqual( + (dividend_offset + index) % (divisor_offset + index), + response.remainder) + self.assertEqual(stream_length, index + 1) + + +class DynamicInlineStubTest(unittest.TestCase): + + def testUnaryUnary(self): + divisor = 59 + dividend = 973 + expected_quotient = dividend / divisor + expected_remainder = dividend % divisor + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + dynamic_stub = implementations.assemble_dynamic_inline_stub( + _IMPLEMENTATIONS, pipe) + + service.start() + with dynamic_stub: + response = dynamic_stub.Div( + math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT) + self.assertEqual(expected_quotient, response.quotient) + self.assertEqual(expected_remainder, response.remainder) + service.stop() + + def testUnaryStream(self): + stream_length = 43 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + dynamic_stub = implementations.assemble_dynamic_inline_stub( + _IMPLEMENTATIONS, pipe) + + with service, dynamic_stub: + response_iterator = dynamic_stub.Fib( + math_pb2.FibArgs(limit=stream_length), _TIMEOUT) + numbers = tuple(response.num for response in response_iterator) + for early, middle, later in zip(numbers, numbers[:1], numbers[:2]): + self.assertEqual(early + middle, later) + self.assertEqual(stream_length, len(numbers)) + + def testStreamUnary(self): + stream_length = 127 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + dynamic_stub = implementations.assemble_dynamic_inline_stub( + _IMPLEMENTATIONS, pipe) + + with service, dynamic_stub: + response_future = dynamic_stub.Sum.async( + (math_pb2.Num(num=index) for index in range(stream_length)), + _TIMEOUT) + self.assertEqual( + (stream_length * (stream_length - 1)) / 2, + response_future.result().num) + + def testStreamStream(self): + stream_length = 179 + divisor_offset = 71 + dividend_offset = 1763 + pipe = PipeLink() + service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) + dynamic_stub = implementations.assemble_dynamic_inline_stub( + _IMPLEMENTATIONS, pipe) + + with service, dynamic_stub: + response_iterator = dynamic_stub.DivMany( + (math_pb2.DivArgs( + divisor=divisor_offset + index, + dividend=dividend_offset + index) + for index in range(stream_length)), + _TIMEOUT) + for index, response in enumerate(response_iterator): + self.assertEqual( + (dividend_offset + index) / (divisor_offset + index), + response.quotient) + self.assertEqual( + (dividend_offset + index) % (divisor_offset + index), + response.remainder) + self.assertEqual(stream_length, index + 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py new file mode 100644 index 00000000000..e5d750b2bc2 --- /dev/null +++ b/src/python/src/grpc/framework/assembly/interfaces.py @@ -0,0 +1,91 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in +# the face layer. The two should be squashed together as soon as manageable. +"""Interfaces for assembling RPC Framework values.""" + +import abc + +# cardinality, style, and stream are referenced from specification in this +# module. +from grpc.framework.common import cardinality # pylint: disable=unused-import +from grpc.framework.common import style # pylint: disable=unused-import +from grpc.framework.foundation import stream # pylint: disable=unused-import + + +class MethodImplementation(object): + """A sum type that describes an RPC method implementation. + + Attributes: + cardinality: A cardinality.Cardinality value. + style: A style.Service value. + unary_unary_inline: The implementation of the RPC method as a callable + value that takes a request value and a face_interfaces.RpcContext object + and returns a response value. Only non-None if cardinality is + cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE. + unary_stream_inline: The implementation of the RPC method as a callable + value that takes a request value and a face_interfaces.RpcContext object + and returns an iterator of response values. Only non-None if cardinality + is cardinality.Cardinality.UNARY_STREAM and style is + style.Service.INLINE. + stream_unary_inline: The implementation of the RPC method as a callable + value that takes an iterator of request values and a + face_interfaces.RpcContext object and returns a response value. Only + non-None if cardinality is cardinality.Cardinality.STREAM_UNARY and style + is style.Service.INLINE. + stream_stream_inline: The implementation of the RPC method as a callable + value that takes an iterator of request values and a + face_interfaces.RpcContext object and returns an iterator of response + values. Only non-None if cardinality is + cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE. + unary_unary_event: The implementation of the RPC method as a callable value + that takes a request value, a response callback to which to pass the + response value of the RPC, and a face_interfaces.RpcContext. Only + non-None if cardinality is cardinality.Cardinality.UNARY_UNARY and style + is style.Service.EVENT. + unary_stream_event: The implementation of the RPC method as a callable + value that takes a request value, a stream.Consumer to which to pass the + the response values of the RPC, and a face_interfaces.RpcContext. Only + non-None if cardinality is cardinality.Cardinality.UNARY_STREAM and style + is style.Service.EVENT. + stream_unary_event: The implementation of the RPC method as a callable + value that takes a response callback to which to pass the response value + of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer + to which the request values of the RPC should be passed. Only non-None if + cardinality is cardinality.Cardinality.STREAM_UNARY and style is + style.Service.EVENT. + stream_stream_event: The implementation of the RPC method as a callable + value that takes a stream.Consumer to which to pass the response values + of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer + to which the request values of the RPC should be passed. Only non-None if + cardinality is cardinality.Cardinality.STREAM_STREAM and style is + style.Service.EVENT. + """ + __metaclass__ = abc.ABCMeta diff --git a/src/python/src/grpc/framework/assembly/utilities.py b/src/python/src/grpc/framework/assembly/utilities.py new file mode 100644 index 00000000000..80e7f59c03c --- /dev/null +++ b/src/python/src/grpc/framework/assembly/utilities.py @@ -0,0 +1,179 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for assembling RPC framework values.""" + +import collections + +from grpc.framework.assembly import interfaces +from grpc.framework.common import cardinality +from grpc.framework.common import style +from grpc.framework.face import interfaces as face_interfaces +from grpc.framework.foundation import stream + + +class _MethodImplementation( + interfaces.MethodImplementation, + collections.namedtuple( + '_MethodImplementation', + ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline', + 'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event', + 'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])): + pass + + +def unary_unary_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a unary-unary RPC method as a callable value + that takes a request value and a face_interfaces.RpcContext object and + returns a response value. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior, + None, None, None, None, None, None, None) + + +def unary_stream_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a unary-stream RPC method as a callable + value that takes a request value and a face_interfaces.RpcContext object + and returns an iterator of response values. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None, + behavior, None, None, None, None, None, None) + + +def stream_unary_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a stream-unary RPC method as a callable + value that takes an iterator of request values and a + face_interfaces.RpcContext object and returns a response value. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None, + behavior, None, None, None, None, None) + + +def stream_stream_inline(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a stream-stream RPC method as a callable + value that takes an iterator of request values and a + face_interfaces.RpcContext object and returns an iterator of response + values. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None, + None, behavior, None, None, None, None) + + +def unary_unary_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a unary-unary RPC method as a callable + value that takes a request value, a response callback to which to pass + the response value of the RPC, and a face_interfaces.RpcContext. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None, + None, None, behavior, None, None, None) + + +def unary_stream_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a unary-stream RPC method as a callable + value that takes a request value, a stream.Consumer to which to pass the + the response values of the RPC, and a face_interfaces.RpcContext. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None, + None, None, None, behavior, None, None) + + +def stream_unary_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a stream-unary RPC method as a callable + value that takes a response callback to which to pass the response value + of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer + to which the request values of the RPC should be passed. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None, + None, None, None, None, behavior, None) + + +def stream_stream_event(behavior): + """Creates an interfaces.MethodImplementation for the given behavior. + + Args: + behavior: The implementation of a stream-stream RPC method as a callable + value that takes a stream.Consumer to which to pass the response values + of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer + to which the request values of the RPC should be passed. + + Returns: + An interfaces.MethodImplementation derived from the given behavior. + """ + return _MethodImplementation( + cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None, + None, None, None, None, None, behavior) diff --git a/src/python/src/grpc/framework/common/style.py b/src/python/src/grpc/framework/common/style.py new file mode 100644 index 00000000000..6ae694bdcb4 --- /dev/null +++ b/src/python/src/grpc/framework/common/style.py @@ -0,0 +1,40 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Defines an enum for classifying RPC methods by control flow semantics.""" + +import enum + + +@enum.unique +class Service(enum.Enum): + """Describes the control flow style of RPC method implementation.""" + + INLINE = 'inline' + EVENT = 'event' diff --git a/src/python/src/grpc/framework/face/utilities.py b/src/python/src/grpc/framework/face/utilities.py new file mode 100644 index 00000000000..5e34be37da7 --- /dev/null +++ b/src/python/src/grpc/framework/face/utilities.py @@ -0,0 +1,221 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for the face layer of RPC Framework.""" + +# stream is referenced from specification in this module. +from grpc.framework.face import interfaces +from grpc.framework.foundation import stream # pylint: disable=unused-import + + +class _InlineUnaryUnaryMethod(interfaces.InlineValueInValueOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, request, context): + return self._behavior(request, context) + + +class _InlineUnaryStreamMethod(interfaces.InlineValueInStreamOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, request, context): + return self._behavior(request, context) + + +class _InlineStreamUnaryMethod(interfaces.InlineStreamInValueOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, request_iterator, context): + return self._behavior(request_iterator, context) + + +class _InlineStreamStreamMethod(interfaces.InlineStreamInStreamOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, request_iterator, context): + return self._behavior(request_iterator, context) + + +class _EventUnaryUnaryMethod(interfaces.EventValueInValueOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, request, response_callback, context): + return self._behavior(request, response_callback, context) + + +class _EventUnaryStreamMethod(interfaces.EventValueInStreamOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, request, response_consumer, context): + return self._behavior(request, response_consumer, context) + + +class _EventStreamUnaryMethod(interfaces.EventStreamInValueOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, response_callback, context): + return self._behavior(response_callback, context) + + +class _EventStreamStreamMethod(interfaces.EventStreamInStreamOutMethod): + + def __init__(self, behavior): + self._behavior = behavior + + def service(self, response_consumer, context): + return self._behavior(response_consumer, context) + + +def inline_unary_unary_method(behavior): + """Creates an interfaces.InlineValueInValueOutMethod from a behavior. + + Args: + behavior: The implementation of a unary-unary RPC method as a callable + value that takes a request value and an interfaces.RpcContext object and + returns a response value. + + Returns: + An interfaces.InlineValueInValueOutMethod derived from the given behavior. + """ + return _InlineUnaryUnaryMethod(behavior) + + +def inline_unary_stream_method(behavior): + """Creates an interfaces.InlineValueInStreamOutMethod from a behavior. + + Args: + behavior: The implementation of a unary-stream RPC method as a callable + value that takes a request value and an interfaces.RpcContext object and + returns an iterator of response values. + + Returns: + An interfaces.InlineValueInStreamOutMethod derived from the given behavior. + """ + return _InlineUnaryStreamMethod(behavior) + + +def inline_stream_unary_method(behavior): + """Creates an interfaces.InlineStreamInValueOutMethod from a behavior. + + Args: + behavior: The implementation of a stream-unary RPC method as a callable + value that takes an iterator of request values and an + interfaces.RpcContext object and returns a response value. + + Returns: + An interfaces.InlineStreamInValueOutMethod derived from the given behavior. + """ + return _InlineStreamUnaryMethod(behavior) + + +def inline_stream_stream_method(behavior): + """Creates an interfaces.InlineStreamInStreamOutMethod from a behavior. + + Args: + behavior: The implementation of a stream-stream RPC method as a callable + value that takes an iterator of request values and an + interfaces.RpcContext object and returns an iterator of response values. + + Returns: + An interfaces.InlineStreamInStreamOutMethod derived from the given + behavior. + """ + return _InlineStreamStreamMethod(behavior) + + +def event_unary_unary_method(behavior): + """Creates an interfaces.EventValueInValueOutMethod from a behavior. + + Args: + behavior: The implementation of a unary-unary RPC method as a callable + value that takes a request value, a response callback to which to pass + the response value of the RPC, and an interfaces.RpcContext. + + Returns: + An interfaces.EventValueInValueOutMethod derived from the given behavior. + """ + return _EventUnaryUnaryMethod(behavior) + + +def event_unary_stream_method(behavior): + """Creates an interfaces.EventValueInStreamOutMethod from a behavior. + + Args: + behavior: The implementation of a unary-stream RPC method as a callable + value that takes a request value, a stream.Consumer to which to pass the + response values of the RPC, and an interfaces.RpcContext. + + Returns: + An interfaces.EventValueInStreamOutMethod derived from the given behavior. + """ + return _EventUnaryStreamMethod(behavior) + + +def event_stream_unary_method(behavior): + """Creates an interfaces.EventStreamInValueOutMethod from a behavior. + + Args: + behavior: The implementation of a stream-unary RPC method as a callable + value that takes a response callback to which to pass the response value + of the RPC and an interfaces.RpcContext and returns a stream.Consumer to + which the request values of the RPC should be passed. + + Returns: + An interfaces.EventStreamInValueOutMethod derived from the given behavior. + """ + return _EventStreamUnaryMethod(behavior) + + +def event_stream_stream_method(behavior): + """Creates an interfaces.EventStreamInStreamOutMethod from a behavior. + + Args: + behavior: The implementation of a stream-stream RPC method as a callable + value that takes a stream.Consumer to which to pass the response values + of the RPC and an interfaces.RpcContext and returns a stream.Consumer to + which the request values of the RPC should be passed. + + Returns: + An interfaces.EventStreamInStreamOutMethod derived from the given behavior. + """ + return _EventStreamStreamMethod(behavior) diff --git a/src/python/src/grpc/framework/foundation/activated.py b/src/python/src/grpc/framework/foundation/activated.py new file mode 100644 index 00000000000..426a71c7059 --- /dev/null +++ b/src/python/src/grpc/framework/foundation/activated.py @@ -0,0 +1,65 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces related to streams of values or objects.""" + +import abc + + +class Activated(object): + """Interface for objects that may be started and stopped. + + Values implementing this type must also implement the context manager + protocol. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __enter__(self): + """See the context manager protocol for specification.""" + raise NotImplementedError() + + @abc.abstractmethod + def __exit__(self, exc_type, exc_val, exc_tb): + """See the context manager protocol for specification.""" + raise NotImplementedError() + + @abc.abstractmethod + def start(self): + """Activates this object. + + Returns: + A value equal to the value returned by this object's __enter__ method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stop(self): + """Deactivates this object.""" + raise NotImplementedError() diff --git a/src/python/src/setup.py b/src/python/src/setup.py index 8e33ebb31c4..e3f13fa5c8f 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -56,12 +56,13 @@ _EXTENSION_MODULE = _core.Extension( libraries=_EXTENSION_LIBRARIES, ) -_PACKAGES=( +_PACKAGES = ( 'grpc', 'grpc._adapter', 'grpc._junkdrawer', 'grpc.early_adopter', 'grpc.framework', + 'grpc.framework.assembly', 'grpc.framework.base', 'grpc.framework.base.packets', 'grpc.framework.common', diff --git a/tools/dockerfile/grpc_python/Dockerfile b/tools/dockerfile/grpc_python/Dockerfile index d434b473510..362227bb652 100644 --- a/tools/dockerfile/grpc_python/Dockerfile +++ b/tools/dockerfile/grpc_python/Dockerfile @@ -24,12 +24,13 @@ RUN cd /var/local/git/grpc \ && python2.7 -B -m grpc._adapter._links_test && python2.7 -B -m grpc._adapter._lonely_rear_link_test && python2.7 -B -m grpc._adapter._low_test - && python2.7 -B -m grpc._framework.base.packets.implementations_test - && python2.7 -B -m grpc._framework.face.blocking_invocation_inline_service_test - && python2.7 -B -m grpc._framework.face.event_invocation_synchronous_event_service_test - && python2.7 -B -m grpc._framework.face.future_invocation_asynchronous_event_service_test - && python2.7 -B -m grpc._framework.foundation._later_test - && python2.7 -B -m grpc._framework.foundation._logging_pool_test + && python2.7 -B -m grpc.framework.assembly.implementations_test + && python2.7 -B -m grpc.framework.base.packets.implementations_test + && python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test + && python2.7 -B -m grpc.framework.face.event_invocation_synchronous_event_service_test + && python2.7 -B -m grpc.framework.face.future_invocation_asynchronous_event_service_test + && python2.7 -B -m grpc.framework.foundation._later_test + && python2.7 -B -m grpc.framework.foundation._logging_pool_test # Add a cacerts directory containing the Google root pem file, allowing the interop client to access the production test instance ADD cacerts cacerts diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index f21f854b09a..1b8fe1982d1 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -44,6 +44,7 @@ python2.7 -B -m grpc._adapter._future_invocation_asynchronous_event_service_test python2.7 -B -m grpc._adapter._links_test python2.7 -B -m grpc._adapter._lonely_rear_link_test python2.7 -B -m grpc._adapter._low_test +python2.7 -B -m grpc.framework.assembly.implementations_test python2.7 -B -m grpc.framework.base.packets.implementations_test python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test python2.7 -B -m grpc.framework.face.event_invocation_synchronous_event_service_test