diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index ee3d72fdbc8..729b987dd12 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -41,6 +41,13 @@ from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay from grpc.framework.interfaces.links import links +_STOP = _intermediary_low.Event.Kind.STOP +_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED +_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED +_READ = _intermediary_low.Event.Kind.READ_ACCEPTED +_METADATA = _intermediary_low.Event.Kind.METADATA_ACCEPTED +_FINISH = _intermediary_low.Event.Kind.FINISH + @enum.unique class _Read(enum.Enum): @@ -67,7 +74,7 @@ class _RPCState(object): def __init__( self, call, request_serializer, response_deserializer, sequence_number, - read, allowance, high_write, low_write): + read, allowance, high_write, low_write, due): self.call = call self.request_serializer = request_serializer self.response_deserializer = response_deserializer @@ -76,6 +83,13 @@ class _RPCState(object): self.allowance = allowance self.high_write = high_write self.low_write = low_write + self.due = due + + +def _no_longer_due(kind, rpc_state, key, rpc_states): + rpc_state.due.remove(kind) + if not rpc_state.due: + del rpc_states[key] class _Kernel(object): @@ -91,12 +105,14 @@ class _Kernel(object): self._relay = ticket_relay self._completion_queue = None - self._rpc_states = None + self._rpc_states = {} self._pool = None def _on_write_event(self, operation_id, unused_event, rpc_state): if rpc_state.high_write is _HighWrite.CLOSED: rpc_state.call.complete(operation_id) + rpc_state.due.add(_COMPLETE) + rpc_state.due.remove(_WRITE) rpc_state.low_write = _LowWrite.CLOSED else: ticket = links.Ticket( @@ -105,16 +121,19 @@ class _Kernel(object): rpc_state.sequence_number += 1 self._relay.add_value(ticket) rpc_state.low_write = _LowWrite.OPEN + _no_longer_due(_WRITE, rpc_state, operation_id, self._rpc_states) def _on_read_event(self, operation_id, event, rpc_state): - if event.bytes is None: + if event.bytes is None or _FINISH not in rpc_state.due: rpc_state.read = _Read.CLOSED + _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states) else: if 0 < rpc_state.allowance: rpc_state.allowance -= 1 rpc_state.call.read(operation_id) else: rpc_state.read = _Read.AWAITING_ALLOWANCE + _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states) ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, None, None, rpc_state.response_deserializer(event.bytes), None, None, None, @@ -123,18 +142,23 @@ class _Kernel(object): self._relay.add_value(ticket) def _on_metadata_event(self, operation_id, event, rpc_state): - rpc_state.allowance -= 1 - rpc_state.call.read(operation_id) - rpc_state.read = _Read.READING - ticket = links.Ticket( - operation_id, rpc_state.sequence_number, None, None, - links.Ticket.Subscription.FULL, None, None, event.metadata, None, None, - None, None, None, None) - rpc_state.sequence_number += 1 - self._relay.add_value(ticket) + if _FINISH in rpc_state.due: + rpc_state.allowance -= 1 + rpc_state.call.read(operation_id) + rpc_state.read = _Read.READING + rpc_state.due.add(_READ) + rpc_state.due.remove(_METADATA) + ticket = links.Ticket( + operation_id, rpc_state.sequence_number, None, None, + links.Ticket.Subscription.FULL, None, None, event.metadata, None, + None, None, None, None, None) + rpc_state.sequence_number += 1 + self._relay.add_value(ticket) + else: + _no_longer_due(_METADATA, rpc_state, operation_id, self._rpc_states) def _on_finish_event(self, operation_id, event, rpc_state): - self._rpc_states.pop(operation_id, None) + _no_longer_due(_FINISH, rpc_state, operation_id, self._rpc_states) if event.status.code is _intermediary_low.Code.OK: termination = links.Ticket.Termination.COMPLETION elif event.status.code is _intermediary_low.Code.CANCELLED: @@ -155,26 +179,26 @@ class _Kernel(object): def _spin(self, completion_queue): while True: event = completion_queue.get(None) - if event.kind is _intermediary_low.Event.Kind.STOP: - return - operation_id = event.tag with self._lock: - if self._completion_queue is None: - continue - rpc_state = self._rpc_states.get(operation_id) - if rpc_state is not None: - if event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED: - self._on_write_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.METADATA_ACCEPTED: - self._on_metadata_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED: - self._on_read_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.FINISH: - self._on_finish_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED: - pass - else: - logging.error('Illegal RPC event! %s', (event,)) + rpc_state = self._rpc_states.get(event.tag, None) + if event.kind is _STOP: + pass + elif event.kind is _WRITE: + self._on_write_event(event.tag, event, rpc_state) + elif event.kind is _METADATA: + self._on_metadata_event(event.tag, event, rpc_state) + elif event.kind is _READ: + self._on_read_event(event.tag, event, rpc_state) + elif event.kind is _FINISH: + self._on_finish_event(event.tag, event, rpc_state) + elif event.kind is _COMPLETE: + _no_longer_due(_COMPLETE, rpc_state, event.tag, self._rpc_states) + else: + logging.error('Illegal RPC event! %s', (event,)) + + if self._completion_queue is None and not self._rpc_states: + completion_queue.stop() + return def _invoke( self, operation_id, group, method, initial_metadata, payload, termination, @@ -221,26 +245,31 @@ class _Kernel(object): if high_write is _HighWrite.CLOSED: call.complete(operation_id) low_write = _LowWrite.CLOSED + due = set((_METADATA, _COMPLETE, _FINISH,)) else: low_write = _LowWrite.OPEN + due = set((_METADATA, _FINISH,)) else: call.write(request_serializer(payload), operation_id) low_write = _LowWrite.ACTIVE + due = set((_WRITE, _METADATA, _FINISH,)) self._rpc_states[operation_id] = _RPCState( call, request_serializer, response_deserializer, 0, _Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance), - high_write, low_write) + high_write, low_write, due) def _advance(self, operation_id, rpc_state, payload, termination, allowance): if payload is not None: rpc_state.call.write(rpc_state.request_serializer(payload), operation_id) rpc_state.low_write = _LowWrite.ACTIVE + rpc_state.due.add(_WRITE) if allowance is not None: if rpc_state.read is _Read.AWAITING_ALLOWANCE: rpc_state.allowance += allowance - 1 rpc_state.call.read(operation_id) rpc_state.read = _Read.READING + rpc_state.due.add(_READ) else: rpc_state.allowance += allowance @@ -248,19 +277,21 @@ class _Kernel(object): rpc_state.high_write = _HighWrite.CLOSED if rpc_state.low_write is _LowWrite.OPEN: rpc_state.call.complete(operation_id) + rpc_state.due.add(_COMPLETE) rpc_state.low_write = _LowWrite.CLOSED elif termination is not None: rpc_state.call.cancel() def add_ticket(self, ticket): with self._lock: - if self._completion_queue is None: - return if ticket.sequence_number == 0: - self._invoke( - ticket.operation_id, ticket.group, ticket.method, - ticket.initial_metadata, ticket.payload, ticket.termination, - ticket.timeout, ticket.allowance) + if self._completion_queue is None: + logging.error('Received invocation ticket %s after stop!', ticket) + else: + self._invoke( + ticket.operation_id, ticket.group, ticket.method, + ticket.initial_metadata, ticket.payload, ticket.termination, + ticket.timeout, ticket.allowance) else: rpc_state = self._rpc_states.get(ticket.operation_id) if rpc_state is not None: @@ -276,7 +307,6 @@ class _Kernel(object): """ with self._lock: self._completion_queue = _intermediary_low.CompletionQueue() - self._rpc_states = {} self._pool = logging_pool.pool(1) self._pool.submit(self._spin, self._completion_queue) @@ -288,11 +318,10 @@ class _Kernel(object): has been called. """ with self._lock: - self._completion_queue.stop() + if not self._rpc_states: + self._completion_queue.stop() self._completion_queue = None pool = self._pool - self._pool = None - self._rpc_states = None pool.shutdown(wait=True) diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index c5ecc47cd9c..bbfe9bcd55f 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -53,6 +53,13 @@ _TERMINATION_KIND_TO_CODE = { links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN, } +_STOP = _intermediary_low.Event.Kind.STOP +_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED +_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED +_SERVICE = _intermediary_low.Event.Kind.SERVICE_ACCEPTED +_READ = _intermediary_low.Event.Kind.READ_ACCEPTED +_FINISH = _intermediary_low.Event.Kind.FINISH + @enum.unique class _Read(enum.Enum): @@ -84,7 +91,7 @@ class _RPCState(object): def __init__( self, request_deserializer, response_serializer, sequence_number, read, early_read, allowance, high_write, low_write, premetadataed, - terminal_metadata, code, message): + terminal_metadata, code, message, due): self.request_deserializer = request_deserializer self.response_serializer = response_serializer self.sequence_number = sequence_number @@ -99,6 +106,13 @@ class _RPCState(object): self.terminal_metadata = terminal_metadata self.code = code self.message = message + self.due = due + + +def _no_longer_due(kind, rpc_state, key, rpc_states): + rpc_state.due.remove(kind) + if not rpc_state.due: + del rpc_states[key] def _metadatafy(call, metadata): @@ -124,6 +138,7 @@ class _Kernel(object): self._relay = ticket_relay self._completion_queue = None + self._due = set() self._server = None self._rpc_states = {} self._pool = None @@ -149,7 +164,8 @@ class _Kernel(object): call.read(call) self._rpc_states[call] = _RPCState( request_deserializer, response_serializer, 1, _Read.READING, None, 1, - _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None) + _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None, + set((_READ, _FINISH,))) ticket = links.Ticket( call, 0, group, method, links.Ticket.Subscription.FULL, service_acceptance.deadline - time.time(), None, event.metadata, None, @@ -158,14 +174,13 @@ class _Kernel(object): def _on_read_event(self, event): call = event.tag - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return + rpc_state = self._rpc_states[call] if event.bytes is None: rpc_state.read = _Read.CLOSED payload = None termination = links.Ticket.Termination.COMPLETION + _no_longer_due(_READ, rpc_state, call, self._rpc_states) else: if 0 < rpc_state.allowance: payload = rpc_state.request_deserializer(event.bytes) @@ -174,6 +189,7 @@ class _Kernel(object): call.read(call) else: rpc_state.early_read = event.bytes + _no_longer_due(_READ, rpc_state, call, self._rpc_states) return # TODO(issue 2916): Instead of returning: # rpc_state.read = _Read.AWAITING_ALLOWANCE @@ -185,9 +201,7 @@ class _Kernel(object): def _on_write_event(self, event): call = event.tag - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return + rpc_state = self._rpc_states[call] if rpc_state.high_write is _HighWrite.CLOSED: if rpc_state.terminal_metadata is not None: @@ -197,6 +211,8 @@ class _Kernel(object): rpc_state.message) call.status(status, call) rpc_state.low_write = _LowWrite.CLOSED + rpc_state.due.add(_COMPLETE) + rpc_state.due.remove(_WRITE) else: ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, 1, None, @@ -204,12 +220,12 @@ class _Kernel(object): rpc_state.sequence_number += 1 self._relay.add_value(ticket) rpc_state.low_write = _LowWrite.OPEN + _no_longer_due(_WRITE, rpc_state, call, self._rpc_states) def _on_finish_event(self, event): call = event.tag - rpc_state = self._rpc_states.pop(call, None) - if rpc_state is None: - return + rpc_state = self._rpc_states[call] + _no_longer_due(_FINISH, rpc_state, call, self._rpc_states) code = event.status.code if code is _intermediary_low.Code.OK: return @@ -229,28 +245,33 @@ class _Kernel(object): def _spin(self, completion_queue, server): while True: event = completion_queue.get(None) - if event.kind is _intermediary_low.Event.Kind.STOP: - return with self._lock: - if self._server is None: - continue - elif event.kind is _intermediary_low.Event.Kind.SERVICE_ACCEPTED: - self._on_service_acceptance_event(event, server) - elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED: + if event.kind is _STOP: + self._due.remove(_STOP) + elif event.kind is _READ: self._on_read_event(event) - elif event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED: + elif event.kind is _WRITE: self._on_write_event(event) - elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED: - pass + elif event.kind is _COMPLETE: + _no_longer_due( + _COMPLETE, self._rpc_states.get(event.tag), event.tag, + self._rpc_states) elif event.kind is _intermediary_low.Event.Kind.FINISH: self._on_finish_event(event) + elif event.kind is _SERVICE: + if self._server is None: + self._due.remove(_SERVICE) + else: + self._on_service_acceptance_event(event, server) else: logging.error('Illegal event! %s', (event,)) + if not self._due and not self._rpc_states: + completion_queue.stop() + return + def add_ticket(self, ticket): with self._lock: - if self._server is None: - return call = ticket.operation_id rpc_state = self._rpc_states.get(call) if rpc_state is None: @@ -278,6 +299,7 @@ class _Kernel(object): rpc_state.early_read = None if rpc_state.read is _Read.READING: call.read(call) + rpc_state.due.add(_READ) termination = None else: termination = links.Ticket.Termination.COMPLETION @@ -289,6 +311,7 @@ class _Kernel(object): if ticket.payload is not None: call.write(rpc_state.response_serializer(ticket.payload), call) + rpc_state.due.add(_WRITE) rpc_state.low_write = _LowWrite.ACTIVE if ticket.terminal_metadata is not None: @@ -307,6 +330,7 @@ class _Kernel(object): links.Ticket.Termination.COMPLETION, rpc_state.code, rpc_state.message) call.status(status, call) + rpc_state.due.add(_COMPLETE) rpc_state.low_write = _LowWrite.CLOSED elif ticket.termination is not None: if rpc_state.terminal_metadata is not None: @@ -314,7 +338,7 @@ class _Kernel(object): status = _status( ticket.termination, rpc_state.code, rpc_state.message) call.status(status, call) - self._rpc_states.pop(call, None) + rpc_state.due.add(_COMPLETE) def add_port(self, address, server_credentials): with self._lock: @@ -335,19 +359,17 @@ class _Kernel(object): self._pool.submit(self._spin, self._completion_queue, self._server) self._server.start() self._server.service(None) + self._due.add(_SERVICE) def begin_stop(self): with self._lock: self._server.stop() + self._due.add(_STOP) self._server = None def end_stop(self): with self._lock: - self._completion_queue.stop() - self._completion_queue = None pool = self._pool - self._pool = None - self._rpc_states = None pool.shutdown(wait=True) @@ -369,7 +391,7 @@ class ServiceLink(links.Link): None for insecure service. Returns: - A integer port on which RPCs will be serviced after this link has been + An integer port on which RPCs will be serviced after this link has been started. This is typically the same number as the port number contained in the passed address, but will likely be different if the port number contained in the passed address was zero. diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py new file mode 100644 index 00000000000..4e46ffd17fc --- /dev/null +++ b/src/python/grpcio/grpc/beta/_server.py @@ -0,0 +1,112 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Beta API server implementation.""" + +import threading + +from grpc._links import service +from grpc.framework.core import implementations as _core_implementations +from grpc.framework.crust import implementations as _crust_implementations +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.links import utilities + +_DEFAULT_POOL_SIZE = 8 +_DEFAULT_TIMEOUT = 300 +_MAXIMUM_TIMEOUT = 24 * 60 * 60 + + +def _disassemble(grpc_link, end_link, pool, event, grace): + grpc_link.begin_stop() + end_link.stop(grace).wait() + grpc_link.end_stop() + grpc_link.join_link(utilities.NULL_LINK) + end_link.join_link(utilities.NULL_LINK) + if pool is not None: + pool.shutdown(wait=True) + event.set() + + +class Server(object): + + def __init__(self, grpc_link, end_link, pool): + self._grpc_link = grpc_link + self._end_link = end_link + self._pool = pool + + def add_insecure_port(self, address): + return self._grpc_link.add_port(address, None) + + def add_secure_port(self, address, intermediary_low_server_credentials): + return self._grpc_link.add_port( + address, intermediary_low_server_credentials) + + def start(self): + self._grpc_link.join_link(self._end_link) + self._end_link.join_link(self._grpc_link) + self._grpc_link.start() + self._end_link.start() + + def stop(self, grace): + stop_event = threading.Event() + if 0 < grace: + disassembly_thread = threading.Thread( + target=_disassemble, + args=( + self._grpc_link, self._end_link, self._pool, stop_event, grace,)) + disassembly_thread.start() + return stop_event + else: + _disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0) + return stop_event + + +def server( + implementations, multi_implementation, request_deserializers, + response_serializers, thread_pool, thread_pool_size, default_timeout, + maximum_timeout): + if thread_pool is None: + service_thread_pool = logging_pool.pool( + _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) + assembly_thread_pool = service_thread_pool + else: + service_thread_pool = thread_pool + assembly_thread_pool = None + + servicer = _crust_implementations.servicer( + implementations, multi_implementation, service_thread_pool) + + grpc_link = service.service_link(request_deserializers, response_serializers) + + end_link = _core_implementations.service_end_link( + servicer, + _DEFAULT_TIMEOUT if default_timeout is None else default_timeout, + _MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout) + + return Server(grpc_link, end_link, assembly_thread_pool) diff --git a/src/python/grpcio/grpc/beta/_stub.py b/src/python/grpcio/grpc/beta/_stub.py new file mode 100644 index 00000000000..178f06d21e5 --- /dev/null +++ b/src/python/grpcio/grpc/beta/_stub.py @@ -0,0 +1,109 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Beta API stub implementation.""" + +import threading + +from grpc._links import invocation +from grpc.framework.core import implementations as _core_implementations +from grpc.framework.crust import implementations as _crust_implementations +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.links import utilities + +_DEFAULT_POOL_SIZE = 6 + + +class _AutoIntermediary(object): + + def __init__(self, delegate, on_deletion): + self._delegate = delegate + self._on_deletion = on_deletion + + def __getattr__(self, attr): + return getattr(self._delegate, attr) + + def __del__(self): + self._on_deletion() + + +def _assemble( + channel, host, request_serializers, response_deserializers, thread_pool, + thread_pool_size): + end_link = _core_implementations.invocation_end_link() + grpc_link = invocation.invocation_link( + channel, host, request_serializers, response_deserializers) + if thread_pool is None: + invocation_pool = logging_pool.pool( + _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) + assembly_pool = invocation_pool + else: + invocation_pool = thread_pool + assembly_pool = None + end_link.join_link(grpc_link) + grpc_link.join_link(end_link) + end_link.start() + grpc_link.start() + return end_link, grpc_link, invocation_pool, assembly_pool + + +def _disassemble(end_link, grpc_link, pool): + end_link.stop(24 * 60 * 60).wait() + grpc_link.stop() + end_link.join_link(utilities.NULL_LINK) + grpc_link.join_link(utilities.NULL_LINK) + if pool is not None: + pool.shutdown(wait=True) + + +def _wrap_assembly(stub, end_link, grpc_link, assembly_pool): + disassembly_thread = threading.Thread( + target=_disassemble, args=(end_link, grpc_link, assembly_pool)) + return _AutoIntermediary(stub, disassembly_thread.start) + + +def generic_stub( + channel, host, request_serializers, response_deserializers, thread_pool, + thread_pool_size): + end_link, grpc_link, invocation_pool, assembly_pool = _assemble( + channel, host, request_serializers, response_deserializers, thread_pool, + thread_pool_size) + stub = _crust_implementations.generic_stub(end_link, invocation_pool) + return _wrap_assembly(stub, end_link, grpc_link, assembly_pool) + + +def dynamic_stub( + channel, host, service, cardinalities, request_serializers, + response_deserializers, thread_pool, thread_pool_size): + end_link, grpc_link, invocation_pool, assembly_pool = _assemble( + channel, host, request_serializers, response_deserializers, thread_pool, + thread_pool_size) + stub = _crust_implementations.dynamic_stub( + end_link, service, cardinalities, invocation_pool) + return _wrap_assembly(stub, end_link, grpc_link, assembly_pool) diff --git a/src/python/grpcio/grpc/beta/beta.py b/src/python/grpcio/grpc/beta/beta.py index 40cad5e4868..640e4eb86b5 100644 --- a/src/python/grpcio/grpc/beta/beta.py +++ b/src/python/grpcio/grpc/beta/beta.py @@ -27,13 +27,21 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Entry points into gRPC Python Beta.""" +"""Entry points into the Beta API of gRPC Python.""" +# threading is referenced from specification in this module. +import abc import enum +import threading # pylint: disable=unused-import -from grpc._adapter import _low +# cardinality and face are referenced from specification in this module. +from grpc._adapter import _intermediary_low from grpc._adapter import _types from grpc.beta import _connectivity_channel +from grpc.beta import _server +from grpc.beta import _stub +from grpc.framework.common import cardinality # pylint: disable=unused-import +from grpc.framework.interfaces.face import face # pylint: disable=unused-import _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 'Exception calling channel subscription callback!') @@ -65,6 +73,39 @@ _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { } +class ClientCredentials(object): + """A value encapsulating the data required to create a secure Channel. + + This class and its instances have no supported interface - it exists to define + the type of its instances and its instances exist to be passed to other + functions. + """ + + def __init__(self, low_credentials, intermediary_low_credentials): + self._low_credentials = low_credentials + self._intermediary_low_credentials = intermediary_low_credentials + + +def ssl_client_credentials(root_certificates, private_key, certificate_chain): + """Creates a ClientCredentials for use with an SSL-enabled Channel. + + Args: + root_certificates: The PEM-encoded root certificates or None to ask for + them to be retrieved from a default location. + private_key: The PEM-encoded private key to use or None if no private key + should be used. + certificate_chain: The PEM-encoded certificate chain to use or None if no + certificate chain should be used. + + Returns: + A ClientCredentials for use with an SSL-enabled Channel. + """ + intermediary_low_credentials = _intermediary_low.ClientCredentials( + root_certificates, private_key, certificate_chain) + return ClientCredentials( + intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access + + class Channel(object): """A channel to a remote host through which RPCs may be conducted. @@ -73,7 +114,9 @@ class Channel(object): unsupported. """ - def __init__(self, low_channel): + def __init__(self, low_channel, intermediary_low_channel): + self._low_channel = low_channel + self._intermediary_low_channel = intermediary_low_channel self._connectivity_channel = _connectivity_channel.ConnectivityChannel( low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY) @@ -111,4 +154,336 @@ def create_insecure_channel(host, port): Returns: A Channel to the remote host through which RPCs may be conducted. """ - return Channel(_low.Channel('%s:%d' % (host, port), ())) + intermediary_low_channel = _intermediary_low.Channel( + '%s:%d' % (host, port), None) + return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access + + +def create_secure_channel(host, port, client_credentials): + """Creates a secure Channel to a remote host. + + Args: + host: The name of the remote host to which to connect. + port: The port of the remote host to which to connect. + client_credentials: A ClientCredentials. + + Returns: + A secure Channel to the remote host through which RPCs may be conducted. + """ + intermediary_low_channel = _intermediary_low.Channel( + '%s:%d' % (host, port), client_credentials.intermediary_low_credentials) + return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access + + +class StubOptions(object): + """A value encapsulating the various options for creation of a Stub. + + This class and its instances have no supported interface - it exists to define + the type of its instances and its instances exist to be passed to other + functions. + """ + + def __init__( + self, host, request_serializers, response_deserializers, + metadata_transformer, thread_pool, thread_pool_size): + self.host = host + self.request_serializers = request_serializers + self.response_deserializers = response_deserializers + self.metadata_transformer = metadata_transformer + self.thread_pool = thread_pool + self.thread_pool_size = thread_pool_size + +_EMPTY_STUB_OPTIONS = StubOptions( + None, None, None, None, None, None) + + +def stub_options( + host=None, request_serializers=None, response_deserializers=None, + metadata_transformer=None, thread_pool=None, thread_pool_size=None): + """Creates a StubOptions value to be passed at stub creation. + + All parameters are optional and should always be passed by keyword. + + Args: + host: A host string to set on RPC calls. + request_serializers: A dictionary from service name-method name pair to + request serialization behavior. + response_deserializers: A dictionary from service name-method name pair to + response deserialization behavior. + metadata_transformer: A callable that given a metadata object produces + another metadata object to be used in the underlying communication on the + wire. + thread_pool: A thread pool to use in stubs. + thread_pool_size: The size of thread pool to create for use in stubs; + ignored if thread_pool has been passed. + + Returns: + A StubOptions value created from the passed parameters. + """ + return StubOptions( + host, request_serializers, response_deserializers, + metadata_transformer, thread_pool, thread_pool_size) + + +def generic_stub(channel, options=None): + """Creates a face.GenericStub on which RPCs can be made. + + Args: + channel: A Channel for use by the created stub. + options: A StubOptions customizing the created stub. + + Returns: + A face.GenericStub on which RPCs can be made. + """ + effective_options = _EMPTY_STUB_OPTIONS if options is None else options + return _stub.generic_stub( + channel._intermediary_low_channel, effective_options.host, # pylint: disable=protected-access + effective_options.request_serializers, + effective_options.response_deserializers, effective_options.thread_pool, + effective_options.thread_pool_size) + + +def dynamic_stub(channel, service, cardinalities, options=None): + """Creates a face.DynamicStub with which RPCs can be invoked. + + Args: + channel: A Channel for the returned face.DynamicStub to use. + service: The package-qualified full name of the service. + cardinalities: A dictionary from RPC method name to cardinality.Cardinality + value identifying the cardinality of the RPC method. + options: An optional StubOptions value further customizing the functionality + of the returned face.DynamicStub. + + Returns: + A face.DynamicStub with which RPCs can be invoked. + """ + effective_options = StubOptions() if options is None else options + return _stub.dynamic_stub( + channel._intermediary_low_channel, effective_options.host, service, # pylint: disable=protected-access + cardinalities, effective_options.request_serializers, + effective_options.response_deserializers, effective_options.thread_pool, + effective_options.thread_pool_size) + + +class ServerCredentials(object): + """A value encapsulating the data required to open a secure port on a Server. + + This class and its instances have no supported interface - it exists to define + the type of its instances and its instances exist to be passed to other + functions. + """ + + def __init__(self, low_credentials, intermediary_low_credentials): + self._low_credentials = low_credentials + self._intermediary_low_credentials = intermediary_low_credentials + + +def ssl_server_credentials( + private_key_certificate_chain_pairs, root_certificates=None, + require_client_auth=False): + """Creates a ServerCredentials for use with an SSL-enabled Server. + + Args: + private_key_certificate_chain_pairs: A nonempty sequence each element of + which is a pair the first element of which is a PEM-encoded private key + and the second element of which is the corresponding PEM-encoded + certificate chain. + root_certificates: PEM-encoded client root certificates to be used for + verifying authenticated clients. If omitted, require_client_auth must also + be omitted or be False. + require_client_auth: A boolean indicating whether or not to require clients + to be authenticated. May only be True if root_certificates is not None. + + Returns: + A ServerCredentials for use with an SSL-enabled Server. + """ + if len(private_key_certificate_chain_pairs) == 0: + raise ValueError( + 'At least one private key-certificate chain pairis required!') + elif require_client_auth and root_certificates is None: + raise ValueError( + 'Illegal to require client auth without providing root certificates!') + else: + intermediary_low_credentials = _intermediary_low.ServerCredentials( + root_certificates, private_key_certificate_chain_pairs, + require_client_auth) + return ServerCredentials( + intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access + + +class Server(object): + """Services RPCs.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def add_insecure_port(self, address): + """Reserves a port for insecure RPC service once this Server becomes active. + + This method may only be called before calling this Server's start method is + called. + + Args: + address: The address for which to open a port. + + Returns: + An integer port on which RPCs will be serviced after this link has been + started. This is typically the same number as the port number contained + in the passed address, but will likely be different if the port number + contained in the passed address was zero. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_secure_port(self, address, server_credentials): + """Reserves a port for secure RPC service after this Server becomes active. + + This method may only be called before calling this Server's start method is + called. + + Args: + address: The address for which to open a port. + server_credentials: A ServerCredentials. + + Returns: + An integer port on which RPCs will be serviced after this link has been + started. This is typically the same number as the port number contained + in the passed address, but will likely be different if the port number + contained in the passed address was zero. + """ + raise NotImplementedError() + + @abc.abstractmethod + def start(self): + """Starts this Server's service of RPCs. + + This method may only be called while the server is not serving RPCs (i.e. it + is not idempotent). + """ + raise NotImplementedError() + + @abc.abstractmethod + def stop(self, grace): + """Stops this Server's service of RPCs. + + All calls to this method immediately stop service of new RPCs. When existing + RPCs are aborted is controlled by the grace period parameter passed to this + method. + + This method may be called at any time and is idempotent. Passing a smaller + grace value than has been passed in a previous call will have the effect of + stopping the Server sooner. Passing a larger grace value than has been + passed in a previous call will not have the effect of stopping the sooner + later. + + Args: + grace: A duration of time in seconds to allow existing RPCs to complete + before being aborted by this Server's stopping. May be zero for + immediate abortion of all in-progress RPCs. + + Returns: + A threading.Event that will be set when this Server has completely + stopped. The returned event may not be set until after the full grace + period (if some ongoing RPC continues for the full length of the period) + of it may be set much sooner (such as if this Server had no RPCs underway + at the time it was stopped or if all RPCs that it had underway completed + very early in the grace period). + """ + raise NotImplementedError() + + +class ServerOptions(object): + """A value encapsulating the various options for creation of a Server. + + This class and its instances have no supported interface - it exists to define + the type of its instances and its instances exist to be passed to other + functions. + """ + + def __init__( + self, multi_method_implementation, request_deserializers, + response_serializers, thread_pool, thread_pool_size, default_timeout, + maximum_timeout): + self.multi_method_implementation = multi_method_implementation + self.request_deserializers = request_deserializers + self.response_serializers = response_serializers + self.thread_pool = thread_pool + self.thread_pool_size = thread_pool_size + self.default_timeout = default_timeout + self.maximum_timeout = maximum_timeout + +_EMPTY_SERVER_OPTIONS = ServerOptions( + None, None, None, None, None, None, None) + + +def server_options( + multi_method_implementation=None, request_deserializers=None, + response_serializers=None, thread_pool=None, thread_pool_size=None, + default_timeout=None, maximum_timeout=None): + """Creates a ServerOptions value to be passed at server creation. + + All parameters are optional and should always be passed by keyword. + + Args: + multi_method_implementation: A face.MultiMethodImplementation to be called + to service an RPC if the server has no specific method implementation for + the name of the RPC for which service was requested. + request_deserializers: A dictionary from service name-method name pair to + request deserialization behavior. + response_serializers: A dictionary from service name-method name pair to + response serialization behavior. + thread_pool: A thread pool to use in stubs. + thread_pool_size: The size of thread pool to create for use in stubs; + ignored if thread_pool has been passed. + default_timeout: A duration in seconds to allow for RPC service when + servicing RPCs that did not include a timeout value when invoked. + maximum_timeout: A duration in seconds to allow for RPC service when + servicing RPCs no matter what timeout value was passed when the RPC was + invoked. + + Returns: + A StubOptions value created from the passed parameters. + """ + return ServerOptions( + multi_method_implementation, request_deserializers, response_serializers, + thread_pool, thread_pool_size, default_timeout, maximum_timeout) + + +class _Server(Server): + + def __init__(self, underserver): + self._underserver = underserver + + def add_insecure_port(self, address): + return self._underserver.add_insecure_port(address) + + def add_secure_port(self, address, server_credentials): + return self._underserver.add_secure_port( + address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access + + def start(self): + self._underserver.start() + + def stop(self, grace): + return self._underserver.stop(grace) + + +def server(service_implementations, options=None): + """Creates a Server with which RPCs can be serviced. + + Args: + service_implementations: A dictionary from service name-method name pair to + face.MethodImplementation. + options: An optional ServerOptions value further customizing the + functionality of the returned Server. + + Returns: + A Server with which RPCs can be serviced. + """ + effective_options = _EMPTY_SERVER_OPTIONS if options is None else options + underserver = _server.server( + service_implementations, effective_options.multi_method_implementation, + effective_options.request_deserializers, + effective_options.response_serializers, effective_options.thread_pool, + effective_options.thread_pool_size, effective_options.default_timeout, + effective_options.maximum_timeout) + return _Server(underserver) diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py new file mode 100644 index 00000000000..ce4c59c0eea --- /dev/null +++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py @@ -0,0 +1,137 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests Face interface compliance of the gRPC Python Beta API.""" + +import collections +import unittest + +from grpc._adapter import _intermediary_low +from grpc.beta import beta +from grpc_test import resources +from grpc_test import test_common as grpc_test_common +from grpc_test.beta import test_utilities +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.face import test_cases +from grpc_test.framework.interfaces.face import test_interfaces + +_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' + + +class _SerializationBehaviors( + collections.namedtuple( + '_SerializationBehaviors', + ('request_serializers', 'request_deserializers', 'response_serializers', + 'response_deserializers',))): + pass + + +def _serialization_behaviors_from_test_methods(test_methods): + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for (group, method), test_method in test_methods.iteritems(): + request_serializers[group, method] = test_method.serialize_request + request_deserializers[group, method] = test_method.deserialize_request + response_serializers[group, method] = test_method.serialize_response + response_deserializers[group, method] = test_method.deserialize_response + return _SerializationBehaviors( + request_serializers, request_deserializers, response_serializers, + response_deserializers) + + +class _Implementation(test_interfaces.Implementation): + + def instantiate( + self, methods, method_implementations, multi_method_implementation): + serialization_behaviors = _serialization_behaviors_from_test_methods( + methods) + # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. + service = next(iter(methods))[0] + # TODO(nathaniel): Add a "cardinalities_by_group" attribute to + # _digest.TestServiceDigest. + cardinalities = { + method: method_object.cardinality() + for (group, method), method_object in methods.iteritems()} + + server_options = beta.server_options( + request_deserializers=serialization_behaviors.request_deserializers, + response_serializers=serialization_behaviors.response_serializers, + thread_pool_size=test_constants.POOL_SIZE) + server = beta.server(method_implementations, options=server_options) + server_credentials = beta.ssl_server_credentials( + [(resources.private_key(), resources.certificate_chain(),),]) + port = server.add_secure_port('[::]:0', server_credentials) + server.start() + client_credentials = beta.ssl_client_credentials( + resources.test_root_certificates(), None, None) + channel = test_utilities.create_not_really_secure_channel( + 'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE) + stub_options = beta.stub_options( + request_serializers=serialization_behaviors.request_serializers, + response_deserializers=serialization_behaviors.response_deserializers, + thread_pool_size=test_constants.POOL_SIZE) + generic_stub = beta.generic_stub(channel, options=stub_options) + dynamic_stub = beta.dynamic_stub( + channel, service, cardinalities, options=stub_options) + return generic_stub, {service: dynamic_stub}, server + + def destantiate(self, memo): + memo.stop(test_constants.SHORT_TIMEOUT).wait() + + def invocation_metadata(self): + return grpc_test_common.INVOCATION_INITIAL_METADATA + + def initial_metadata(self): + return grpc_test_common.SERVICE_INITIAL_METADATA + + def terminal_metadata(self): + return grpc_test_common.SERVICE_TERMINAL_METADATA + + def code(self): + return _intermediary_low.Code.OK + + def details(self): + return grpc_test_common.DETAILS + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return original_metadata is None or grpc_test_common.metadata_transmitted( + original_metadata, transmitted_metadata) + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/beta/test_utilities.py b/src/python/grpcio_test/grpc_test/beta/test_utilities.py new file mode 100644 index 00000000000..338670478d5 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/beta/test_utilities.py @@ -0,0 +1,54 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test-appropriate entry points into the gRPC Python Beta API.""" + +from grpc._adapter import _intermediary_low +from grpc.beta import beta + + +def create_not_really_secure_channel( + host, port, client_credentials, server_host_override): + """Creates an insecure Channel to a remote host. + + Args: + host: The name of the remote host to which to connect. + port: The port of the remote host to which to connect. + client_credentials: The beta.ClientCredentials with which to connect. + server_host_override: The target name used for SSL host name checking. + + Returns: + A beta.Channel to the remote host through which RPCs may be conducted. + """ + hostport = '%s:%d' % (host, port) + intermediary_low_channel = _intermediary_low.Channel( + hostport, client_credentials._intermediary_low_credentials, + server_host_override=server_host_override) + return beta.Channel( + intermediary_low_channel._internal, intermediary_low_channel) diff --git a/src/python/grpcio_test/grpc_test/credentials/README b/src/python/grpcio_test/grpc_test/credentials/README new file mode 100644 index 00000000000..cb20dcb49fb --- /dev/null +++ b/src/python/grpcio_test/grpc_test/credentials/README @@ -0,0 +1 @@ +These are test keys *NOT* to be used in production. diff --git a/src/python/grpcio_test/grpc_test/credentials/ca.pem b/src/python/grpcio_test/grpc_test/credentials/ca.pem new file mode 100755 index 00000000000..6c8511a73c6 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/credentials/ca.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla +Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 +YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT +BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 ++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu +g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd +Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV +HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau +sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m +oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG +Dfcog5wrJytaQ6UA0wE= +-----END CERTIFICATE----- diff --git a/src/python/grpcio_test/grpc_test/credentials/server1.key b/src/python/grpcio_test/grpc_test/credentials/server1.key new file mode 100755 index 00000000000..143a5b87658 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/credentials/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/src/python/grpcio_test/grpc_test/credentials/server1.pem b/src/python/grpcio_test/grpc_test/credentials/server1.pem new file mode 100755 index 00000000000..8e582e571f6 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/credentials/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5 +MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl +c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs +JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO +RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30 +3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL +BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6 +b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ +KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS +wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e +aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s= +-----END CERTIFICATE----- diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py index 8804f3f2233..b7dd5d4d171 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -73,6 +73,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): Overriding implementations must call this implementation. """ + self._invoker = None self.implementation.destantiate(self._memo) def testSuccessfulUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py index 5a78b4bed24..7cb273bf787 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py @@ -74,6 +74,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): Overriding implementations must call this implementation. """ + self._invoker = None self.implementation.destantiate(self._memo) def testSuccessfulUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index d1107e1576d..272a37f15f8 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -103,6 +103,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): Overriding implementations must call this implementation. """ + self._invoker = None self.implementation.destantiate(self._memo) self._digest_pool.shutdown(wait=True) diff --git a/src/python/grpcio_test/grpc_test/resources.py b/src/python/grpcio_test/grpc_test/resources.py new file mode 100644 index 00000000000..2c3045313d4 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/resources.py @@ -0,0 +1,56 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Constants and functions for data used in interoperability testing.""" + +import os + +import pkg_resources + +_ROOT_CERTIFICATES_RESOURCE_PATH = 'credentials/ca.pem' +_PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' +_CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' + + +def test_root_certificates(): + return pkg_resources.resource_string( + __name__, _ROOT_CERTIFICATES_RESOURCE_PATH) + + +def prod_root_certificates(): + return open(os.environ['SSL_CERT_FILE'], mode='rb').read() + + +def private_key(): + return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH) + + +def certificate_chain(): + return pkg_resources.resource_string( + __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index 898ea204acf..802dd1e53af 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -55,6 +55,11 @@ _PACKAGE_DATA = { 'grpc_protoc_plugin': [ 'test.proto', ], + 'grpc_test': [ + 'credentials/ca.pem', + 'credentials/server1.key', + 'credentials/server1.pem', + ], } _SETUP_REQUIRES = ( diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index fe5685f793e..977b02fd949 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -46,6 +46,7 @@ source "python"$PYVER"_virtual_environment"/bin/activate # the team... "python"$PYVER -m grpc_test._core_over_links_base_interface_test "python"$PYVER -m grpc_test._crust_over_core_over_links_face_interface_test +"python"$PYVER -m grpc_test.beta._face_interface_test "python"$PYVER -m grpc_test.framework._crust_over_core_face_interface_test "python"$PYVER -m grpc_test.framework.core._base_interface_test