Thread-pool-less construction of GRPC links.

These will be used in generated code in circumstances in which we
don't necessarily want to be asking calling code to have a thread
pool readily available.
pull/709/head
Nathaniel Manista 10 years ago
parent 6e8d15e7de
commit e04e20aaca
  1. 90
      src/python/src/grpc/_adapter/fore.py
  2. 75
      src/python/src/grpc/_adapter/rear.py

@ -41,6 +41,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
_THREAD_POOL_SIZE = 100
@enum.unique
@ -353,3 +356,90 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)
class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated):
def __init__(
self, port, request_deserializers, response_serializers,
root_certificates, key_chain_pairs):
self._port = port
self._request_deserializers = request_deserializers
self._response_serializers = response_serializers
self._root_certificates = root_certificates
self._key_chain_pairs = key_chain_pairs
self._lock = threading.Lock()
self._pool = None
self._fore_link = None
self._rear_link = null.NULL_REAR_LINK
def join_rear_link(self, rear_link):
with self._lock:
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
if self._fore_link is not None:
self._fore_link.join_rear_link(rear_link)
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._fore_link = ForeLink(
self._pool, self._request_deserializers, self._response_serializers,
self._root_certificates, self._key_chain_pairs, port=self._port)
self._fore_link.join_rear_link(self._rear_link)
self._fore_link.start()
return self
def _stop(self):
with self._lock:
self._fore_link.stop()
self._fore_link = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def port(self):
with self._lock:
return None if self._fore_link is None else self._fore_link.port()
def accept_back_to_front_ticket(self, ticket):
with self._lock:
if self._fore_link is not None:
self._fore_link.accept_back_to_front_ticket(ticket)
def activated_fore_link(
port, request_deserializers, response_serializers, root_certificates,
key_chain_pairs):
"""Creates a ForeLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
port: The port on which to serve RPCs, or None for a port to be
automatically selected.
request_deserializers: A dictionary from RPC method names to request object
deserializer behaviors.
response_serializers: A dictionary from RPC method names to response object
serializer behaviors.
root_certificates: The PEM-encoded client root certificates as a bytestring
or None.
key_chain_pairs: A sequence of PEM-encoded private key-certificate chain
pairs.
"""
return _ActivatedForeLink(
port, request_deserializers, response_serializers, root_certificates,
key_chain_pairs)

@ -40,6 +40,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
_THREAD_POOL_SIZE = 100
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
@ -361,3 +364,75 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated):
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
self._cancel(ticket.operation_id)
class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated):
def __init__(self, host, port, request_serializers, response_deserializers):
self._host = host
self._port = port
self._request_serializers = request_serializers
self._response_deserializers = response_deserializers
self._lock = threading.Lock()
self._pool = None
self._rear_link = None
self._fore_link = null.NULL_FORE_LINK
def join_fore_link(self, fore_link):
with self._lock:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._rear_link = RearLink(
self._host, self._port, self._pool, self._request_serializers,
self._response_deserializers)
self._rear_link.join_fore_link(self._fore_link)
self._rear_link.start()
return self
def _stop(self):
with self._lock:
self._rear_link.stop()
self._rear_link = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def accept_front_to_back_ticket(self, ticket):
with self._lock:
if self._rear_link is not None:
self._rear_link.accept_front_to_back_ticket(ticket)
def activated_rear_link(
host, port, request_serializers, response_deserializers):
"""Creates a RearLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
request_serializers: A dictionary from RPC method name to request object
serializer behavior.
response_deserializers: A dictionary from RPC method name to response
object deserializer behavior.
"""
return _ActivatedRearLink(
host, port, request_serializers, response_deserializers)

Loading…
Cancel
Save