The rest of the Python Beta API

pull/3134/head
Nathaniel Manista 9 years ago
commit 9e4d0610ea
  1. 85
      src/python/grpcio/grpc/_links/invocation.py
  2. 80
      src/python/grpcio/grpc/_links/service.py
  3. 112
      src/python/grpcio/grpc/beta/_server.py
  4. 109
      src/python/grpcio/grpc/beta/_stub.py
  5. 383
      src/python/grpcio/grpc/beta/beta.py
  6. 137
      src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
  7. 54
      src/python/grpcio_test/grpc_test/beta/test_utilities.py
  8. 1
      src/python/grpcio_test/grpc_test/credentials/README
  9. 15
      src/python/grpcio_test/grpc_test/credentials/ca.pem
  10. 16
      src/python/grpcio_test/grpc_test/credentials/server1.key
  11. 16
      src/python/grpcio_test/grpc_test/credentials/server1.pem
  12. 1
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
  13. 1
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
  14. 1
      src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
  15. 56
      src/python/grpcio_test/grpc_test/resources.py
  16. 5
      src/python/grpcio_test/setup.py
  17. 1
      tools/run_tests/run_python.sh

@ -41,6 +41,13 @@ from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay
from grpc.framework.interfaces.links import links
_STOP = _intermediary_low.Event.Kind.STOP
_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED
_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED
_READ = _intermediary_low.Event.Kind.READ_ACCEPTED
_METADATA = _intermediary_low.Event.Kind.METADATA_ACCEPTED
_FINISH = _intermediary_low.Event.Kind.FINISH
@enum.unique
class _Read(enum.Enum):
@ -67,7 +74,7 @@ class _RPCState(object):
def __init__(
self, call, request_serializer, response_deserializer, sequence_number,
read, allowance, high_write, low_write):
read, allowance, high_write, low_write, due):
self.call = call
self.request_serializer = request_serializer
self.response_deserializer = response_deserializer
@ -76,6 +83,13 @@ class _RPCState(object):
self.allowance = allowance
self.high_write = high_write
self.low_write = low_write
self.due = due
def _no_longer_due(kind, rpc_state, key, rpc_states):
rpc_state.due.remove(kind)
if not rpc_state.due:
del rpc_states[key]
class _Kernel(object):
@ -91,12 +105,14 @@ class _Kernel(object):
self._relay = ticket_relay
self._completion_queue = None
self._rpc_states = None
self._rpc_states = {}
self._pool = None
def _on_write_event(self, operation_id, unused_event, rpc_state):
if rpc_state.high_write is _HighWrite.CLOSED:
rpc_state.call.complete(operation_id)
rpc_state.due.add(_COMPLETE)
rpc_state.due.remove(_WRITE)
rpc_state.low_write = _LowWrite.CLOSED
else:
ticket = links.Ticket(
@ -105,16 +121,19 @@ class _Kernel(object):
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
rpc_state.low_write = _LowWrite.OPEN
_no_longer_due(_WRITE, rpc_state, operation_id, self._rpc_states)
def _on_read_event(self, operation_id, event, rpc_state):
if event.bytes is None:
if event.bytes is None or _FINISH not in rpc_state.due:
rpc_state.read = _Read.CLOSED
_no_longer_due(_READ, rpc_state, operation_id, self._rpc_states)
else:
if 0 < rpc_state.allowance:
rpc_state.allowance -= 1
rpc_state.call.read(operation_id)
else:
rpc_state.read = _Read.AWAITING_ALLOWANCE
_no_longer_due(_READ, rpc_state, operation_id, self._rpc_states)
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, None,
None, rpc_state.response_deserializer(event.bytes), None, None, None,
@ -123,18 +142,23 @@ class _Kernel(object):
self._relay.add_value(ticket)
def _on_metadata_event(self, operation_id, event, rpc_state):
if _FINISH in rpc_state.due:
rpc_state.allowance -= 1
rpc_state.call.read(operation_id)
rpc_state.read = _Read.READING
rpc_state.due.add(_READ)
rpc_state.due.remove(_METADATA)
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None,
links.Ticket.Subscription.FULL, None, None, event.metadata, None, None,
None, None, None, None)
links.Ticket.Subscription.FULL, None, None, event.metadata, None,
None, None, None, None, None)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
else:
_no_longer_due(_METADATA, rpc_state, operation_id, self._rpc_states)
def _on_finish_event(self, operation_id, event, rpc_state):
self._rpc_states.pop(operation_id, None)
_no_longer_due(_FINISH, rpc_state, operation_id, self._rpc_states)
if event.status.code is _intermediary_low.Code.OK:
termination = links.Ticket.Termination.COMPLETION
elif event.status.code is _intermediary_low.Code.CANCELLED:
@ -155,27 +179,27 @@ class _Kernel(object):
def _spin(self, completion_queue):
while True:
event = completion_queue.get(None)
if event.kind is _intermediary_low.Event.Kind.STOP:
return
operation_id = event.tag
with self._lock:
if self._completion_queue is None:
continue
rpc_state = self._rpc_states.get(operation_id)
if rpc_state is not None:
if event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED:
self._on_write_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.METADATA_ACCEPTED:
self._on_metadata_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED:
self._on_read_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.FINISH:
self._on_finish_event(operation_id, event, rpc_state)
elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED:
rpc_state = self._rpc_states.get(event.tag, None)
if event.kind is _STOP:
pass
elif event.kind is _WRITE:
self._on_write_event(event.tag, event, rpc_state)
elif event.kind is _METADATA:
self._on_metadata_event(event.tag, event, rpc_state)
elif event.kind is _READ:
self._on_read_event(event.tag, event, rpc_state)
elif event.kind is _FINISH:
self._on_finish_event(event.tag, event, rpc_state)
elif event.kind is _COMPLETE:
_no_longer_due(_COMPLETE, rpc_state, event.tag, self._rpc_states)
else:
logging.error('Illegal RPC event! %s', (event,))
if self._completion_queue is None and not self._rpc_states:
completion_queue.stop()
return
def _invoke(
self, operation_id, group, method, initial_metadata, payload, termination,
timeout, allowance):
@ -221,26 +245,31 @@ class _Kernel(object):
if high_write is _HighWrite.CLOSED:
call.complete(operation_id)
low_write = _LowWrite.CLOSED
due = set((_METADATA, _COMPLETE, _FINISH,))
else:
low_write = _LowWrite.OPEN
due = set((_METADATA, _FINISH,))
else:
call.write(request_serializer(payload), operation_id)
low_write = _LowWrite.ACTIVE
due = set((_WRITE, _METADATA, _FINISH,))
self._rpc_states[operation_id] = _RPCState(
call, request_serializer, response_deserializer, 0,
_Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance),
high_write, low_write)
high_write, low_write, due)
def _advance(self, operation_id, rpc_state, payload, termination, allowance):
if payload is not None:
rpc_state.call.write(rpc_state.request_serializer(payload), operation_id)
rpc_state.low_write = _LowWrite.ACTIVE
rpc_state.due.add(_WRITE)
if allowance is not None:
if rpc_state.read is _Read.AWAITING_ALLOWANCE:
rpc_state.allowance += allowance - 1
rpc_state.call.read(operation_id)
rpc_state.read = _Read.READING
rpc_state.due.add(_READ)
else:
rpc_state.allowance += allowance
@ -248,15 +277,17 @@ class _Kernel(object):
rpc_state.high_write = _HighWrite.CLOSED
if rpc_state.low_write is _LowWrite.OPEN:
rpc_state.call.complete(operation_id)
rpc_state.due.add(_COMPLETE)
rpc_state.low_write = _LowWrite.CLOSED
elif termination is not None:
rpc_state.call.cancel()
def add_ticket(self, ticket):
with self._lock:
if self._completion_queue is None:
return
if ticket.sequence_number == 0:
if self._completion_queue is None:
logging.error('Received invocation ticket %s after stop!', ticket)
else:
self._invoke(
ticket.operation_id, ticket.group, ticket.method,
ticket.initial_metadata, ticket.payload, ticket.termination,
@ -276,7 +307,6 @@ class _Kernel(object):
"""
with self._lock:
self._completion_queue = _intermediary_low.CompletionQueue()
self._rpc_states = {}
self._pool = logging_pool.pool(1)
self._pool.submit(self._spin, self._completion_queue)
@ -288,11 +318,10 @@ class _Kernel(object):
has been called.
"""
with self._lock:
if not self._rpc_states:
self._completion_queue.stop()
self._completion_queue = None
pool = self._pool
self._pool = None
self._rpc_states = None
pool.shutdown(wait=True)

@ -53,6 +53,13 @@ _TERMINATION_KIND_TO_CODE = {
links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN,
}
_STOP = _intermediary_low.Event.Kind.STOP
_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED
_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED
_SERVICE = _intermediary_low.Event.Kind.SERVICE_ACCEPTED
_READ = _intermediary_low.Event.Kind.READ_ACCEPTED
_FINISH = _intermediary_low.Event.Kind.FINISH
@enum.unique
class _Read(enum.Enum):
@ -84,7 +91,7 @@ class _RPCState(object):
def __init__(
self, request_deserializer, response_serializer, sequence_number, read,
early_read, allowance, high_write, low_write, premetadataed,
terminal_metadata, code, message):
terminal_metadata, code, message, due):
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.sequence_number = sequence_number
@ -99,6 +106,13 @@ class _RPCState(object):
self.terminal_metadata = terminal_metadata
self.code = code
self.message = message
self.due = due
def _no_longer_due(kind, rpc_state, key, rpc_states):
rpc_state.due.remove(kind)
if not rpc_state.due:
del rpc_states[key]
def _metadatafy(call, metadata):
@ -124,6 +138,7 @@ class _Kernel(object):
self._relay = ticket_relay
self._completion_queue = None
self._due = set()
self._server = None
self._rpc_states = {}
self._pool = None
@ -149,7 +164,8 @@ class _Kernel(object):
call.read(call)
self._rpc_states[call] = _RPCState(
request_deserializer, response_serializer, 1, _Read.READING, None, 1,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None)
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None,
set((_READ, _FINISH,)))
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
service_acceptance.deadline - time.time(), None, event.metadata, None,
@ -158,14 +174,13 @@ class _Kernel(object):
def _on_read_event(self, event):
call = event.tag
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
rpc_state = self._rpc_states[call]
if event.bytes is None:
rpc_state.read = _Read.CLOSED
payload = None
termination = links.Ticket.Termination.COMPLETION
_no_longer_due(_READ, rpc_state, call, self._rpc_states)
else:
if 0 < rpc_state.allowance:
payload = rpc_state.request_deserializer(event.bytes)
@ -174,6 +189,7 @@ class _Kernel(object):
call.read(call)
else:
rpc_state.early_read = event.bytes
_no_longer_due(_READ, rpc_state, call, self._rpc_states)
return
# TODO(issue 2916): Instead of returning:
# rpc_state.read = _Read.AWAITING_ALLOWANCE
@ -185,9 +201,7 @@ class _Kernel(object):
def _on_write_event(self, event):
call = event.tag
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
rpc_state = self._rpc_states[call]
if rpc_state.high_write is _HighWrite.CLOSED:
if rpc_state.terminal_metadata is not None:
@ -197,6 +211,8 @@ class _Kernel(object):
rpc_state.message)
call.status(status, call)
rpc_state.low_write = _LowWrite.CLOSED
rpc_state.due.add(_COMPLETE)
rpc_state.due.remove(_WRITE)
else:
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, 1, None,
@ -204,12 +220,12 @@ class _Kernel(object):
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
rpc_state.low_write = _LowWrite.OPEN
_no_longer_due(_WRITE, rpc_state, call, self._rpc_states)
def _on_finish_event(self, event):
call = event.tag
rpc_state = self._rpc_states.pop(call, None)
if rpc_state is None:
return
rpc_state = self._rpc_states[call]
_no_longer_due(_FINISH, rpc_state, call, self._rpc_states)
code = event.status.code
if code is _intermediary_low.Code.OK:
return
@ -229,28 +245,33 @@ class _Kernel(object):
def _spin(self, completion_queue, server):
while True:
event = completion_queue.get(None)
if event.kind is _intermediary_low.Event.Kind.STOP:
return
with self._lock:
if self._server is None:
continue
elif event.kind is _intermediary_low.Event.Kind.SERVICE_ACCEPTED:
self._on_service_acceptance_event(event, server)
elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED:
if event.kind is _STOP:
self._due.remove(_STOP)
elif event.kind is _READ:
self._on_read_event(event)
elif event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED:
elif event.kind is _WRITE:
self._on_write_event(event)
elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED:
pass
elif event.kind is _COMPLETE:
_no_longer_due(
_COMPLETE, self._rpc_states.get(event.tag), event.tag,
self._rpc_states)
elif event.kind is _intermediary_low.Event.Kind.FINISH:
self._on_finish_event(event)
elif event.kind is _SERVICE:
if self._server is None:
self._due.remove(_SERVICE)
else:
self._on_service_acceptance_event(event, server)
else:
logging.error('Illegal event! %s', (event,))
if not self._due and not self._rpc_states:
completion_queue.stop()
return
def add_ticket(self, ticket):
with self._lock:
if self._server is None:
return
call = ticket.operation_id
rpc_state = self._rpc_states.get(call)
if rpc_state is None:
@ -278,6 +299,7 @@ class _Kernel(object):
rpc_state.early_read = None
if rpc_state.read is _Read.READING:
call.read(call)
rpc_state.due.add(_READ)
termination = None
else:
termination = links.Ticket.Termination.COMPLETION
@ -289,6 +311,7 @@ class _Kernel(object):
if ticket.payload is not None:
call.write(rpc_state.response_serializer(ticket.payload), call)
rpc_state.due.add(_WRITE)
rpc_state.low_write = _LowWrite.ACTIVE
if ticket.terminal_metadata is not None:
@ -307,6 +330,7 @@ class _Kernel(object):
links.Ticket.Termination.COMPLETION, rpc_state.code,
rpc_state.message)
call.status(status, call)
rpc_state.due.add(_COMPLETE)
rpc_state.low_write = _LowWrite.CLOSED
elif ticket.termination is not None:
if rpc_state.terminal_metadata is not None:
@ -314,7 +338,7 @@ class _Kernel(object):
status = _status(
ticket.termination, rpc_state.code, rpc_state.message)
call.status(status, call)
self._rpc_states.pop(call, None)
rpc_state.due.add(_COMPLETE)
def add_port(self, address, server_credentials):
with self._lock:
@ -335,19 +359,17 @@ class _Kernel(object):
self._pool.submit(self._spin, self._completion_queue, self._server)
self._server.start()
self._server.service(None)
self._due.add(_SERVICE)
def begin_stop(self):
with self._lock:
self._server.stop()
self._due.add(_STOP)
self._server = None
def end_stop(self):
with self._lock:
self._completion_queue.stop()
self._completion_queue = None
pool = self._pool
self._pool = None
self._rpc_states = None
pool.shutdown(wait=True)
@ -369,7 +391,7 @@ class ServiceLink(links.Link):
None for insecure service.
Returns:
A integer port on which RPCs will be serviced after this link has been
An integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.

@ -0,0 +1,112 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Beta API server implementation."""
import threading
from grpc._links import service
from grpc.framework.core import implementations as _core_implementations
from grpc.framework.crust import implementations as _crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
_DEFAULT_POOL_SIZE = 8
_DEFAULT_TIMEOUT = 300
_MAXIMUM_TIMEOUT = 24 * 60 * 60
def _disassemble(grpc_link, end_link, pool, event, grace):
grpc_link.begin_stop()
end_link.stop(grace).wait()
grpc_link.end_stop()
grpc_link.join_link(utilities.NULL_LINK)
end_link.join_link(utilities.NULL_LINK)
if pool is not None:
pool.shutdown(wait=True)
event.set()
class Server(object):
def __init__(self, grpc_link, end_link, pool):
self._grpc_link = grpc_link
self._end_link = end_link
self._pool = pool
def add_insecure_port(self, address):
return self._grpc_link.add_port(address, None)
def add_secure_port(self, address, intermediary_low_server_credentials):
return self._grpc_link.add_port(
address, intermediary_low_server_credentials)
def start(self):
self._grpc_link.join_link(self._end_link)
self._end_link.join_link(self._grpc_link)
self._grpc_link.start()
self._end_link.start()
def stop(self, grace):
stop_event = threading.Event()
if 0 < grace:
disassembly_thread = threading.Thread(
target=_disassemble,
args=(
self._grpc_link, self._end_link, self._pool, stop_event, grace,))
disassembly_thread.start()
return stop_event
else:
_disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0)
return stop_event
def server(
implementations, multi_implementation, request_deserializers,
response_serializers, thread_pool, thread_pool_size, default_timeout,
maximum_timeout):
if thread_pool is None:
service_thread_pool = logging_pool.pool(
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
assembly_thread_pool = service_thread_pool
else:
service_thread_pool = thread_pool
assembly_thread_pool = None
servicer = _crust_implementations.servicer(
implementations, multi_implementation, service_thread_pool)
grpc_link = service.service_link(request_deserializers, response_serializers)
end_link = _core_implementations.service_end_link(
servicer,
_DEFAULT_TIMEOUT if default_timeout is None else default_timeout,
_MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout)
return Server(grpc_link, end_link, assembly_thread_pool)

@ -0,0 +1,109 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Beta API stub implementation."""
import threading
from grpc._links import invocation
from grpc.framework.core import implementations as _core_implementations
from grpc.framework.crust import implementations as _crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
_DEFAULT_POOL_SIZE = 6
class _AutoIntermediary(object):
def __init__(self, delegate, on_deletion):
self._delegate = delegate
self._on_deletion = on_deletion
def __getattr__(self, attr):
return getattr(self._delegate, attr)
def __del__(self):
self._on_deletion()
def _assemble(
channel, host, request_serializers, response_deserializers, thread_pool,
thread_pool_size):
end_link = _core_implementations.invocation_end_link()
grpc_link = invocation.invocation_link(
channel, host, request_serializers, response_deserializers)
if thread_pool is None:
invocation_pool = logging_pool.pool(
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
assembly_pool = invocation_pool
else:
invocation_pool = thread_pool
assembly_pool = None
end_link.join_link(grpc_link)
grpc_link.join_link(end_link)
end_link.start()
grpc_link.start()
return end_link, grpc_link, invocation_pool, assembly_pool
def _disassemble(end_link, grpc_link, pool):
end_link.stop(24 * 60 * 60).wait()
grpc_link.stop()
end_link.join_link(utilities.NULL_LINK)
grpc_link.join_link(utilities.NULL_LINK)
if pool is not None:
pool.shutdown(wait=True)
def _wrap_assembly(stub, end_link, grpc_link, assembly_pool):
disassembly_thread = threading.Thread(
target=_disassemble, args=(end_link, grpc_link, assembly_pool))
return _AutoIntermediary(stub, disassembly_thread.start)
def generic_stub(
channel, host, request_serializers, response_deserializers, thread_pool,
thread_pool_size):
end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
channel, host, request_serializers, response_deserializers, thread_pool,
thread_pool_size)
stub = _crust_implementations.generic_stub(end_link, invocation_pool)
return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)
def dynamic_stub(
channel, host, service, cardinalities, request_serializers,
response_deserializers, thread_pool, thread_pool_size):
end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
channel, host, request_serializers, response_deserializers, thread_pool,
thread_pool_size)
stub = _crust_implementations.dynamic_stub(
end_link, service, cardinalities, invocation_pool)
return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)

@ -27,13 +27,21 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into gRPC Python Beta."""
"""Entry points into the Beta API of gRPC Python."""
# threading is referenced from specification in this module.
import abc
import enum
import threading # pylint: disable=unused-import
from grpc._adapter import _low
# cardinality and face are referenced from specification in this module.
from grpc._adapter import _intermediary_low
from grpc._adapter import _types
from grpc.beta import _connectivity_channel
from grpc.beta import _server
from grpc.beta import _stub
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
@ -65,6 +73,39 @@ _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
}
class ClientCredentials(object):
"""A value encapsulating the data required to create a secure Channel.
This class and its instances have no supported interface - it exists to define
the type of its instances and its instances exist to be passed to other
functions.
"""
def __init__(self, low_credentials, intermediary_low_credentials):
self._low_credentials = low_credentials
self._intermediary_low_credentials = intermediary_low_credentials
def ssl_client_credentials(root_certificates, private_key, certificate_chain):
"""Creates a ClientCredentials for use with an SSL-enabled Channel.
Args:
root_certificates: The PEM-encoded root certificates or None to ask for
them to be retrieved from a default location.
private_key: The PEM-encoded private key to use or None if no private key
should be used.
certificate_chain: The PEM-encoded certificate chain to use or None if no
certificate chain should be used.
Returns:
A ClientCredentials for use with an SSL-enabled Channel.
"""
intermediary_low_credentials = _intermediary_low.ClientCredentials(
root_certificates, private_key, certificate_chain)
return ClientCredentials(
intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access
class Channel(object):
"""A channel to a remote host through which RPCs may be conducted.
@ -73,7 +114,9 @@ class Channel(object):
unsupported.
"""
def __init__(self, low_channel):
def __init__(self, low_channel, intermediary_low_channel):
self._low_channel = low_channel
self._intermediary_low_channel = intermediary_low_channel
self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
@ -111,4 +154,336 @@ def create_insecure_channel(host, port):
Returns:
A Channel to the remote host through which RPCs may be conducted.
"""
return Channel(_low.Channel('%s:%d' % (host, port), ()))
intermediary_low_channel = _intermediary_low.Channel(
'%s:%d' % (host, port), None)
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
def create_secure_channel(host, port, client_credentials):
"""Creates a secure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
client_credentials: A ClientCredentials.
Returns:
A secure Channel to the remote host through which RPCs may be conducted.
"""
intermediary_low_channel = _intermediary_low.Channel(
'%s:%d' % (host, port), client_credentials.intermediary_low_credentials)
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
class StubOptions(object):
"""A value encapsulating the various options for creation of a Stub.
This class and its instances have no supported interface - it exists to define
the type of its instances and its instances exist to be passed to other
functions.
"""
def __init__(
self, host, request_serializers, response_deserializers,
metadata_transformer, thread_pool, thread_pool_size):
self.host = host
self.request_serializers = request_serializers
self.response_deserializers = response_deserializers
self.metadata_transformer = metadata_transformer
self.thread_pool = thread_pool
self.thread_pool_size = thread_pool_size
_EMPTY_STUB_OPTIONS = StubOptions(
None, None, None, None, None, None)
def stub_options(
host=None, request_serializers=None, response_deserializers=None,
metadata_transformer=None, thread_pool=None, thread_pool_size=None):
"""Creates a StubOptions value to be passed at stub creation.
All parameters are optional and should always be passed by keyword.
Args:
host: A host string to set on RPC calls.
request_serializers: A dictionary from service name-method name pair to
request serialization behavior.
response_deserializers: A dictionary from service name-method name pair to
response deserialization behavior.
metadata_transformer: A callable that given a metadata object produces
another metadata object to be used in the underlying communication on the
wire.
thread_pool: A thread pool to use in stubs.
thread_pool_size: The size of thread pool to create for use in stubs;
ignored if thread_pool has been passed.
Returns:
A StubOptions value created from the passed parameters.
"""
return StubOptions(
host, request_serializers, response_deserializers,
metadata_transformer, thread_pool, thread_pool_size)
def generic_stub(channel, options=None):
"""Creates a face.GenericStub on which RPCs can be made.
Args:
channel: A Channel for use by the created stub.
options: A StubOptions customizing the created stub.
Returns:
A face.GenericStub on which RPCs can be made.
"""
effective_options = _EMPTY_STUB_OPTIONS if options is None else options
return _stub.generic_stub(
channel._intermediary_low_channel, effective_options.host, # pylint: disable=protected-access
effective_options.request_serializers,
effective_options.response_deserializers, effective_options.thread_pool,
effective_options.thread_pool_size)
def dynamic_stub(channel, service, cardinalities, options=None):
"""Creates a face.DynamicStub with which RPCs can be invoked.
Args:
channel: A Channel for the returned face.DynamicStub to use.
service: The package-qualified full name of the service.
cardinalities: A dictionary from RPC method name to cardinality.Cardinality
value identifying the cardinality of the RPC method.
options: An optional StubOptions value further customizing the functionality
of the returned face.DynamicStub.
Returns:
A face.DynamicStub with which RPCs can be invoked.
"""
effective_options = StubOptions() if options is None else options
return _stub.dynamic_stub(
channel._intermediary_low_channel, effective_options.host, service, # pylint: disable=protected-access
cardinalities, effective_options.request_serializers,
effective_options.response_deserializers, effective_options.thread_pool,
effective_options.thread_pool_size)
class ServerCredentials(object):
"""A value encapsulating the data required to open a secure port on a Server.
This class and its instances have no supported interface - it exists to define
the type of its instances and its instances exist to be passed to other
functions.
"""
def __init__(self, low_credentials, intermediary_low_credentials):
self._low_credentials = low_credentials
self._intermediary_low_credentials = intermediary_low_credentials
def ssl_server_credentials(
private_key_certificate_chain_pairs, root_certificates=None,
require_client_auth=False):
"""Creates a ServerCredentials for use with an SSL-enabled Server.
Args:
private_key_certificate_chain_pairs: A nonempty sequence each element of
which is a pair the first element of which is a PEM-encoded private key
and the second element of which is the corresponding PEM-encoded
certificate chain.
root_certificates: PEM-encoded client root certificates to be used for
verifying authenticated clients. If omitted, require_client_auth must also
be omitted or be False.
require_client_auth: A boolean indicating whether or not to require clients
to be authenticated. May only be True if root_certificates is not None.
Returns:
A ServerCredentials for use with an SSL-enabled Server.
"""
if len(private_key_certificate_chain_pairs) == 0:
raise ValueError(
'At least one private key-certificate chain pairis required!')
elif require_client_auth and root_certificates is None:
raise ValueError(
'Illegal to require client auth without providing root certificates!')
else:
intermediary_low_credentials = _intermediary_low.ServerCredentials(
root_certificates, private_key_certificate_chain_pairs,
require_client_auth)
return ServerCredentials(
intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access
class Server(object):
"""Services RPCs."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def add_insecure_port(self, address):
"""Reserves a port for insecure RPC service once this Server becomes active.
This method may only be called before calling this Server's start method is
called.
Args:
address: The address for which to open a port.
Returns:
An integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
raise NotImplementedError()
@abc.abstractmethod
def add_secure_port(self, address, server_credentials):
"""Reserves a port for secure RPC service after this Server becomes active.
This method may only be called before calling this Server's start method is
called.
Args:
address: The address for which to open a port.
server_credentials: A ServerCredentials.
Returns:
An integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
raise NotImplementedError()
@abc.abstractmethod
def start(self):
"""Starts this Server's service of RPCs.
This method may only be called while the server is not serving RPCs (i.e. it
is not idempotent).
"""
raise NotImplementedError()
@abc.abstractmethod
def stop(self, grace):
"""Stops this Server's service of RPCs.
All calls to this method immediately stop service of new RPCs. When existing
RPCs are aborted is controlled by the grace period parameter passed to this
method.
This method may be called at any time and is idempotent. Passing a smaller
grace value than has been passed in a previous call will have the effect of
stopping the Server sooner. Passing a larger grace value than has been
passed in a previous call will not have the effect of stopping the sooner
later.
Args:
grace: A duration of time in seconds to allow existing RPCs to complete
before being aborted by this Server's stopping. May be zero for
immediate abortion of all in-progress RPCs.
Returns:
A threading.Event that will be set when this Server has completely
stopped. The returned event may not be set until after the full grace
period (if some ongoing RPC continues for the full length of the period)
of it may be set much sooner (such as if this Server had no RPCs underway
at the time it was stopped or if all RPCs that it had underway completed
very early in the grace period).
"""
raise NotImplementedError()
class ServerOptions(object):
"""A value encapsulating the various options for creation of a Server.
This class and its instances have no supported interface - it exists to define
the type of its instances and its instances exist to be passed to other
functions.
"""
def __init__(
self, multi_method_implementation, request_deserializers,
response_serializers, thread_pool, thread_pool_size, default_timeout,
maximum_timeout):
self.multi_method_implementation = multi_method_implementation
self.request_deserializers = request_deserializers
self.response_serializers = response_serializers
self.thread_pool = thread_pool
self.thread_pool_size = thread_pool_size
self.default_timeout = default_timeout
self.maximum_timeout = maximum_timeout
_EMPTY_SERVER_OPTIONS = ServerOptions(
None, None, None, None, None, None, None)
def server_options(
multi_method_implementation=None, request_deserializers=None,
response_serializers=None, thread_pool=None, thread_pool_size=None,
default_timeout=None, maximum_timeout=None):
"""Creates a ServerOptions value to be passed at server creation.
All parameters are optional and should always be passed by keyword.
Args:
multi_method_implementation: A face.MultiMethodImplementation to be called
to service an RPC if the server has no specific method implementation for
the name of the RPC for which service was requested.
request_deserializers: A dictionary from service name-method name pair to
request deserialization behavior.
response_serializers: A dictionary from service name-method name pair to
response serialization behavior.
thread_pool: A thread pool to use in stubs.
thread_pool_size: The size of thread pool to create for use in stubs;
ignored if thread_pool has been passed.
default_timeout: A duration in seconds to allow for RPC service when
servicing RPCs that did not include a timeout value when invoked.
maximum_timeout: A duration in seconds to allow for RPC service when
servicing RPCs no matter what timeout value was passed when the RPC was
invoked.
Returns:
A StubOptions value created from the passed parameters.
"""
return ServerOptions(
multi_method_implementation, request_deserializers, response_serializers,
thread_pool, thread_pool_size, default_timeout, maximum_timeout)
class _Server(Server):
def __init__(self, underserver):
self._underserver = underserver
def add_insecure_port(self, address):
return self._underserver.add_insecure_port(address)
def add_secure_port(self, address, server_credentials):
return self._underserver.add_secure_port(
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
def start(self):
self._underserver.start()
def stop(self, grace):
return self._underserver.stop(grace)
def server(service_implementations, options=None):
"""Creates a Server with which RPCs can be serviced.
Args:
service_implementations: A dictionary from service name-method name pair to
face.MethodImplementation.
options: An optional ServerOptions value further customizing the
functionality of the returned Server.
Returns:
A Server with which RPCs can be serviced.
"""
effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
underserver = _server.server(
service_implementations, effective_options.multi_method_implementation,
effective_options.request_deserializers,
effective_options.response_serializers, effective_options.thread_pool,
effective_options.thread_pool_size, effective_options.default_timeout,
effective_options.maximum_timeout)
return _Server(underserver)

@ -0,0 +1,137 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests Face interface compliance of the gRPC Python Beta API."""
import collections
import unittest
from grpc._adapter import _intermediary_low
from grpc.beta import beta
from grpc_test import resources
from grpc_test import test_common as grpc_test_common
from grpc_test.beta import test_utilities
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.face import test_cases
from grpc_test.framework.interfaces.face import test_interfaces
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
class _SerializationBehaviors(
collections.namedtuple(
'_SerializationBehaviors',
('request_serializers', 'request_deserializers', 'response_serializers',
'response_deserializers',))):
pass
def _serialization_behaviors_from_test_methods(test_methods):
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for (group, method), test_method in test_methods.iteritems():
request_serializers[group, method] = test_method.serialize_request
request_deserializers[group, method] = test_method.deserialize_request
response_serializers[group, method] = test_method.serialize_response
response_deserializers[group, method] = test_method.deserialize_response
return _SerializationBehaviors(
request_serializers, request_deserializers, response_serializers,
response_deserializers)
class _Implementation(test_interfaces.Implementation):
def instantiate(
self, methods, method_implementations, multi_method_implementation):
serialization_behaviors = _serialization_behaviors_from_test_methods(
methods)
# TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
service = next(iter(methods))[0]
# TODO(nathaniel): Add a "cardinalities_by_group" attribute to
# _digest.TestServiceDigest.
cardinalities = {
method: method_object.cardinality()
for (group, method), method_object in methods.iteritems()}
server_options = beta.server_options(
request_deserializers=serialization_behaviors.request_deserializers,
response_serializers=serialization_behaviors.response_serializers,
thread_pool_size=test_constants.POOL_SIZE)
server = beta.server(method_implementations, options=server_options)
server_credentials = beta.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain(),),])
port = server.add_secure_port('[::]:0', server_credentials)
server.start()
client_credentials = beta.ssl_client_credentials(
resources.test_root_certificates(), None, None)
channel = test_utilities.create_not_really_secure_channel(
'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE)
stub_options = beta.stub_options(
request_serializers=serialization_behaviors.request_serializers,
response_deserializers=serialization_behaviors.response_deserializers,
thread_pool_size=test_constants.POOL_SIZE)
generic_stub = beta.generic_stub(channel, options=stub_options)
dynamic_stub = beta.dynamic_stub(
channel, service, cardinalities, options=stub_options)
return generic_stub, {service: dynamic_stub}, server
def destantiate(self, memo):
memo.stop(test_constants.SHORT_TIMEOUT).wait()
def invocation_metadata(self):
return grpc_test_common.INVOCATION_INITIAL_METADATA
def initial_metadata(self):
return grpc_test_common.SERVICE_INITIAL_METADATA
def terminal_metadata(self):
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
return _intermediary_low.Code.OK
def details(self):
return grpc_test_common.DETAILS
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
original_metadata, transmitted_metadata)
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(
loader.loadTestsFromTestCase(test_case_class)
for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,54 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test-appropriate entry points into the gRPC Python Beta API."""
from grpc._adapter import _intermediary_low
from grpc.beta import beta
def create_not_really_secure_channel(
host, port, client_credentials, server_host_override):
"""Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
client_credentials: The beta.ClientCredentials with which to connect.
server_host_override: The target name used for SSL host name checking.
Returns:
A beta.Channel to the remote host through which RPCs may be conducted.
"""
hostport = '%s:%d' % (host, port)
intermediary_low_channel = _intermediary_low.Channel(
hostport, client_credentials._intermediary_low_credentials,
server_host_override=server_host_override)
return beta.Channel(
intermediary_low_channel._internal, intermediary_low_channel)

@ -0,0 +1 @@
These are test keys *NOT* to be used in production.

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla
Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0
YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT
BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7
+L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu
g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd
Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV
HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau
sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m
oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG
Dfcog5wrJytaQ6UA0wE=
-----END CERTIFICATE-----

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD
M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf
3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY
AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm
V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY
tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p
dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q
K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR
81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff
DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd
aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2
ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3
XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe
F98XJ7tIFfJq
-----END PRIVATE KEY-----

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5
MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV
BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl
c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs
JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO
RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30
3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL
BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6
b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ
KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS
wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e
aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s=
-----END CERTIFICATE-----

@ -73,6 +73,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
Overriding implementations must call this implementation.
"""
self._invoker = None
self.implementation.destantiate(self._memo)
def testSuccessfulUnaryRequestUnaryResponse(self):

@ -74,6 +74,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
Overriding implementations must call this implementation.
"""
self._invoker = None
self.implementation.destantiate(self._memo)
def testSuccessfulUnaryRequestUnaryResponse(self):

@ -103,6 +103,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
Overriding implementations must call this implementation.
"""
self._invoker = None
self.implementation.destantiate(self._memo)
self._digest_pool.shutdown(wait=True)

@ -0,0 +1,56 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Constants and functions for data used in interoperability testing."""
import os
import pkg_resources
_ROOT_CERTIFICATES_RESOURCE_PATH = 'credentials/ca.pem'
_PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key'
_CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem'
def test_root_certificates():
return pkg_resources.resource_string(
__name__, _ROOT_CERTIFICATES_RESOURCE_PATH)
def prod_root_certificates():
return open(os.environ['SSL_CERT_FILE'], mode='rb').read()
def private_key():
return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH)
def certificate_chain():
return pkg_resources.resource_string(
__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH)

@ -55,6 +55,11 @@ _PACKAGE_DATA = {
'grpc_protoc_plugin': [
'test.proto',
],
'grpc_test': [
'credentials/ca.pem',
'credentials/server1.key',
'credentials/server1.pem',
],
}
_SETUP_REQUIRES = (

@ -46,6 +46,7 @@ source "python"$PYVER"_virtual_environment"/bin/activate
# the team...
"python"$PYVER -m grpc_test._core_over_links_base_interface_test
"python"$PYVER -m grpc_test._crust_over_core_over_links_face_interface_test
"python"$PYVER -m grpc_test.beta._face_interface_test
"python"$PYVER -m grpc_test.framework._crust_over_core_face_interface_test
"python"$PYVER -m grpc_test.framework.core._base_interface_test

Loading…
Cancel
Save