|
|
|
@ -44,6 +44,12 @@ _DEFAULT_TIMEOUT = 300 |
|
|
|
|
_MAXIMUM_TIMEOUT = 24 * 60 * 60 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_event(): |
|
|
|
|
event = threading.Event() |
|
|
|
|
event.set() |
|
|
|
|
return event |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _GRPCServicer(base.Servicer): |
|
|
|
|
|
|
|
|
|
def __init__(self, delegate): |
|
|
|
@ -61,86 +67,143 @@ class _GRPCServicer(base.Servicer): |
|
|
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _disassemble(grpc_link, end_link, pool, event, grace): |
|
|
|
|
grpc_link.begin_stop() |
|
|
|
|
end_link.stop(grace).wait() |
|
|
|
|
grpc_link.end_stop() |
|
|
|
|
grpc_link.join_link(utilities.NULL_LINK) |
|
|
|
|
end_link.join_link(utilities.NULL_LINK) |
|
|
|
|
if pool is not None: |
|
|
|
|
pool.shutdown(wait=True) |
|
|
|
|
event.set() |
|
|
|
|
class _Server(interfaces.Server): |
|
|
|
|
|
|
|
|
|
def __init__( |
|
|
|
|
self, implementations, multi_implementation, pool, pool_size, |
|
|
|
|
default_timeout, maximum_timeout, grpc_link): |
|
|
|
|
self._lock = threading.Lock() |
|
|
|
|
self._implementations = implementations |
|
|
|
|
self._multi_implementation = multi_implementation |
|
|
|
|
self._customer_pool = pool |
|
|
|
|
self._pool_size = pool_size |
|
|
|
|
self._default_timeout = default_timeout |
|
|
|
|
self._maximum_timeout = maximum_timeout |
|
|
|
|
self._grpc_link = grpc_link |
|
|
|
|
|
|
|
|
|
class Server(interfaces.Server): |
|
|
|
|
self._end_link = None |
|
|
|
|
self._stop_events = None |
|
|
|
|
self._pool = None |
|
|
|
|
|
|
|
|
|
def __init__(self, grpc_link, end_link, pool): |
|
|
|
|
self._grpc_link = grpc_link |
|
|
|
|
self._end_link = end_link |
|
|
|
|
self._pool = pool |
|
|
|
|
def _start(self): |
|
|
|
|
with self._lock: |
|
|
|
|
if self._end_link is not None: |
|
|
|
|
raise ValueError('Cannot start already-started server!') |
|
|
|
|
|
|
|
|
|
def add_insecure_port(self, address): |
|
|
|
|
return self._grpc_link.add_port(address, None) |
|
|
|
|
if self._customer_pool is None: |
|
|
|
|
self._pool = logging_pool.pool(self._pool_size) |
|
|
|
|
assembly_pool = self._pool |
|
|
|
|
else: |
|
|
|
|
assembly_pool = self._customer_pool |
|
|
|
|
|
|
|
|
|
def add_secure_port(self, address, server_credentials): |
|
|
|
|
return self._grpc_link.add_port( |
|
|
|
|
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access |
|
|
|
|
servicer = _GRPCServicer( |
|
|
|
|
_crust_implementations.servicer( |
|
|
|
|
self._implementations, self._multi_implementation, assembly_pool)) |
|
|
|
|
|
|
|
|
|
self._end_link = _core_implementations.service_end_link( |
|
|
|
|
servicer, self._default_timeout, self._maximum_timeout) |
|
|
|
|
|
|
|
|
|
def _start(self): |
|
|
|
|
self._grpc_link.join_link(self._end_link) |
|
|
|
|
self._end_link.join_link(self._grpc_link) |
|
|
|
|
self._grpc_link.start() |
|
|
|
|
self._end_link.start() |
|
|
|
|
|
|
|
|
|
def _stop(self, grace): |
|
|
|
|
stop_event = threading.Event() |
|
|
|
|
if 0 < grace: |
|
|
|
|
disassembly_thread = threading.Thread( |
|
|
|
|
target=_disassemble, |
|
|
|
|
args=( |
|
|
|
|
self._grpc_link, self._end_link, self._pool, stop_event, grace,)) |
|
|
|
|
disassembly_thread.start() |
|
|
|
|
return stop_event |
|
|
|
|
def _dissociate_links_and_shut_down_pool(self): |
|
|
|
|
self._grpc_link.end_stop() |
|
|
|
|
self._grpc_link.join_link(utilities.NULL_LINK) |
|
|
|
|
self._end_link.join_link(utilities.NULL_LINK) |
|
|
|
|
self._end_link = None |
|
|
|
|
if self._pool is not None: |
|
|
|
|
self._pool.shutdown(wait=True) |
|
|
|
|
self._pool = None |
|
|
|
|
|
|
|
|
|
def _stop_stopping(self): |
|
|
|
|
self._dissociate_links_and_shut_down_pool() |
|
|
|
|
for stop_event in self._stop_events: |
|
|
|
|
stop_event.set() |
|
|
|
|
self._stop_events = None |
|
|
|
|
|
|
|
|
|
def _stop_started(self): |
|
|
|
|
self._grpc_link.begin_stop() |
|
|
|
|
self._end_link.stop(0).wait() |
|
|
|
|
self._dissociate_links_and_shut_down_pool() |
|
|
|
|
|
|
|
|
|
def _foreign_thread_stop(self, end_stop_event, stop_events): |
|
|
|
|
end_stop_event.wait() |
|
|
|
|
with self._lock: |
|
|
|
|
if self._stop_events is stop_events: |
|
|
|
|
self._stop_stopping() |
|
|
|
|
|
|
|
|
|
def _schedule_stop(self, grace): |
|
|
|
|
with self._lock: |
|
|
|
|
if self._end_link is None: |
|
|
|
|
return _set_event() |
|
|
|
|
server_stop_event = threading.Event() |
|
|
|
|
if self._stop_events is None: |
|
|
|
|
self._stop_events = [server_stop_event] |
|
|
|
|
self._grpc_link.begin_stop() |
|
|
|
|
else: |
|
|
|
|
self._stop_events.append(server_stop_event) |
|
|
|
|
end_stop_event = self._end_link.stop(grace) |
|
|
|
|
end_stop_thread = threading.Thread( |
|
|
|
|
target=self._foreign_thread_stop, |
|
|
|
|
args=(end_stop_event, self._stop_events)) |
|
|
|
|
end_stop_thread.start() |
|
|
|
|
return server_stop_event |
|
|
|
|
|
|
|
|
|
def _stop_now(self): |
|
|
|
|
with self._lock: |
|
|
|
|
if self._end_link is not None: |
|
|
|
|
if self._stop_events is None: |
|
|
|
|
self._stop_started() |
|
|
|
|
else: |
|
|
|
|
_disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0) |
|
|
|
|
return stop_event |
|
|
|
|
self._stop_stopping() |
|
|
|
|
|
|
|
|
|
def add_insecure_port(self, address): |
|
|
|
|
with self._lock: |
|
|
|
|
if self._end_link is None: |
|
|
|
|
return self._grpc_link.add_port(address, None) |
|
|
|
|
else: |
|
|
|
|
raise ValueError('Can\'t add port to serving server!') |
|
|
|
|
|
|
|
|
|
def add_secure_port(self, address, server_credentials): |
|
|
|
|
with self._lock: |
|
|
|
|
if self._end_link is None: |
|
|
|
|
return self._grpc_link.add_port( |
|
|
|
|
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access |
|
|
|
|
else: |
|
|
|
|
raise ValueError('Can\'t add port to serving server!') |
|
|
|
|
|
|
|
|
|
def start(self): |
|
|
|
|
self._start() |
|
|
|
|
|
|
|
|
|
def stop(self, grace): |
|
|
|
|
return self._stop(grace) |
|
|
|
|
if 0 < grace: |
|
|
|
|
return self._schedule_stop(grace) |
|
|
|
|
else: |
|
|
|
|
self._stop_now() |
|
|
|
|
return _set_event() |
|
|
|
|
|
|
|
|
|
def __enter__(self): |
|
|
|
|
self._start() |
|
|
|
|
return self |
|
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
|
|
|
self._stop(0).wait() |
|
|
|
|
self._stop_now() |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def __del__(self): |
|
|
|
|
self._stop_now() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def server( |
|
|
|
|
implementations, multi_implementation, request_deserializers, |
|
|
|
|
response_serializers, thread_pool, thread_pool_size, default_timeout, |
|
|
|
|
maximum_timeout): |
|
|
|
|
if thread_pool is None: |
|
|
|
|
service_thread_pool = logging_pool.pool( |
|
|
|
|
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) |
|
|
|
|
assembly_thread_pool = service_thread_pool |
|
|
|
|
else: |
|
|
|
|
service_thread_pool = thread_pool |
|
|
|
|
assembly_thread_pool = None |
|
|
|
|
|
|
|
|
|
servicer = _GRPCServicer( |
|
|
|
|
_crust_implementations.servicer( |
|
|
|
|
implementations, multi_implementation, service_thread_pool)) |
|
|
|
|
|
|
|
|
|
grpc_link = service.service_link(request_deserializers, response_serializers) |
|
|
|
|
|
|
|
|
|
end_link = _core_implementations.service_end_link( |
|
|
|
|
servicer, |
|
|
|
|
return _Server( |
|
|
|
|
implementations, multi_implementation, thread_pool, |
|
|
|
|
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size, |
|
|
|
|
_DEFAULT_TIMEOUT if default_timeout is None else default_timeout, |
|
|
|
|
_MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout) |
|
|
|
|
|
|
|
|
|
return Server(grpc_link, end_link, assembly_thread_pool) |
|
|
|
|
_MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout, |
|
|
|
|
grpc_link) |
|
|
|
|