Merge pull request #4269 from nathanielmanistaatgoogle/lifecycle

Stub and server lifecycle fixes
pull/4282/head
Masood Malekghassemi 9 years ago
commit 31c16e526f
  1. 175
      src/python/grpcio/grpc/beta/_server.py
  2. 124
      src/python/grpcio/grpc/beta/_stub.py
  3. 60
      src/python/grpcio/grpc/framework/core/_end.py
  4. 73
      src/python/grpcio_test/grpc_test/beta/_beta_features_test.py

@ -44,6 +44,12 @@ _DEFAULT_TIMEOUT = 300
_MAXIMUM_TIMEOUT = 24 * 60 * 60
def _set_event():
event = threading.Event()
event.set()
return event
class _GRPCServicer(base.Servicer):
def __init__(self, delegate):
@ -61,86 +67,143 @@ class _GRPCServicer(base.Servicer):
raise
def _disassemble(grpc_link, end_link, pool, event, grace):
grpc_link.begin_stop()
end_link.stop(grace).wait()
grpc_link.end_stop()
grpc_link.join_link(utilities.NULL_LINK)
end_link.join_link(utilities.NULL_LINK)
if pool is not None:
pool.shutdown(wait=True)
event.set()
class _Server(interfaces.Server):
def __init__(
self, implementations, multi_implementation, pool, pool_size,
default_timeout, maximum_timeout, grpc_link):
self._lock = threading.Lock()
self._implementations = implementations
self._multi_implementation = multi_implementation
self._customer_pool = pool
self._pool_size = pool_size
self._default_timeout = default_timeout
self._maximum_timeout = maximum_timeout
self._grpc_link = grpc_link
class Server(interfaces.Server):
self._end_link = None
self._stop_events = None
self._pool = None
def __init__(self, grpc_link, end_link, pool):
self._grpc_link = grpc_link
self._end_link = end_link
self._pool = pool
def _start(self):
with self._lock:
if self._end_link is not None:
raise ValueError('Cannot start already-started server!')
if self._customer_pool is None:
self._pool = logging_pool.pool(self._pool_size)
assembly_pool = self._pool
else:
assembly_pool = self._customer_pool
servicer = _GRPCServicer(
_crust_implementations.servicer(
self._implementations, self._multi_implementation, assembly_pool))
self._end_link = _core_implementations.service_end_link(
servicer, self._default_timeout, self._maximum_timeout)
self._grpc_link.join_link(self._end_link)
self._end_link.join_link(self._grpc_link)
self._grpc_link.start()
self._end_link.start()
def _dissociate_links_and_shut_down_pool(self):
self._grpc_link.end_stop()
self._grpc_link.join_link(utilities.NULL_LINK)
self._end_link.join_link(utilities.NULL_LINK)
self._end_link = None
if self._pool is not None:
self._pool.shutdown(wait=True)
self._pool = None
def _stop_stopping(self):
self._dissociate_links_and_shut_down_pool()
for stop_event in self._stop_events:
stop_event.set()
self._stop_events = None
def _stop_started(self):
self._grpc_link.begin_stop()
self._end_link.stop(0).wait()
self._dissociate_links_and_shut_down_pool()
def _foreign_thread_stop(self, end_stop_event, stop_events):
end_stop_event.wait()
with self._lock:
if self._stop_events is stop_events:
self._stop_stopping()
def _schedule_stop(self, grace):
with self._lock:
if self._end_link is None:
return _set_event()
server_stop_event = threading.Event()
if self._stop_events is None:
self._stop_events = [server_stop_event]
self._grpc_link.begin_stop()
else:
self._stop_events.append(server_stop_event)
end_stop_event = self._end_link.stop(grace)
end_stop_thread = threading.Thread(
target=self._foreign_thread_stop,
args=(end_stop_event, self._stop_events))
end_stop_thread.start()
return server_stop_event
def _stop_now(self):
with self._lock:
if self._end_link is not None:
if self._stop_events is None:
self._stop_started()
else:
self._stop_stopping()
def add_insecure_port(self, address):
return self._grpc_link.add_port(address, None)
with self._lock:
if self._end_link is None:
return self._grpc_link.add_port(address, None)
else:
raise ValueError('Can\'t add port to serving server!')
def add_secure_port(self, address, server_credentials):
return self._grpc_link.add_port(
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
def _start(self):
self._grpc_link.join_link(self._end_link)
self._end_link.join_link(self._grpc_link)
self._grpc_link.start()
self._end_link.start()
def _stop(self, grace):
stop_event = threading.Event()
if 0 < grace:
disassembly_thread = threading.Thread(
target=_disassemble,
args=(
self._grpc_link, self._end_link, self._pool, stop_event, grace,))
disassembly_thread.start()
return stop_event
else:
_disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0)
return stop_event
with self._lock:
if self._end_link is None:
return self._grpc_link.add_port(
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
else:
raise ValueError('Can\'t add port to serving server!')
def start(self):
self._start()
def stop(self, grace):
return self._stop(grace)
if 0 < grace:
return self._schedule_stop(grace)
else:
self._stop_now()
return _set_event()
def __enter__(self):
self._start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop(0).wait()
self._stop_now()
return False
def __del__(self):
self._stop_now()
def server(
implementations, multi_implementation, request_deserializers,
response_serializers, thread_pool, thread_pool_size, default_timeout,
maximum_timeout):
if thread_pool is None:
service_thread_pool = logging_pool.pool(
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
assembly_thread_pool = service_thread_pool
else:
service_thread_pool = thread_pool
assembly_thread_pool = None
servicer = _GRPCServicer(
_crust_implementations.servicer(
implementations, multi_implementation, service_thread_pool))
grpc_link = service.service_link(request_deserializers, response_serializers)
end_link = _core_implementations.service_end_link(
servicer,
return _Server(
implementations, multi_implementation, thread_pool,
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size,
_DEFAULT_TIMEOUT if default_timeout is None else default_timeout,
_MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout)
return Server(grpc_link, end_link, assembly_thread_pool)
_MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout,
grpc_link)

@ -42,76 +42,114 @@ _DEFAULT_POOL_SIZE = 6
class _AutoIntermediary(object):
def __init__(self, delegate, on_deletion):
def __init__(self, up, down, delegate):
self._lock = threading.Lock()
self._up = up
self._down = down
self._in_context = False
self._delegate = delegate
self._on_deletion = on_deletion
def __getattr__(self, attr):
return getattr(self._delegate, attr)
with self._lock:
if self._delegate is None:
raise AttributeError('No useful attributes out of context!')
else:
return getattr(self._delegate, attr)
def __enter__(self):
return self
with self._lock:
if self._in_context:
raise ValueError('Already in context!')
elif self._delegate is None:
self._delegate = self._up()
self._in_context = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
with self._lock:
if not self._in_context:
raise ValueError('Not in context!')
self._down()
self._in_context = False
self._delegate = None
return False
def __del__(self):
self._on_deletion()
with self._lock:
if self._delegate is not None:
self._down()
self._delegate = None
class _StubAssemblyManager(object):
def __init__(
self, thread_pool, thread_pool_size, end_link, grpc_link, stub_creator):
self._thread_pool = thread_pool
self._pool_size = thread_pool_size
self._end_link = end_link
self._grpc_link = grpc_link
self._stub_creator = stub_creator
self._own_pool = None
def up(self):
if self._thread_pool is None:
self._own_pool = logging_pool.pool(
_DEFAULT_POOL_SIZE if self._pool_size is None else self._pool_size)
assembly_pool = self._own_pool
else:
assembly_pool = self._thread_pool
self._end_link.join_link(self._grpc_link)
self._grpc_link.join_link(self._end_link)
self._end_link.start()
self._grpc_link.start()
return self._stub_creator(self._end_link, assembly_pool)
def down(self):
self._end_link.stop(0).wait()
self._grpc_link.stop()
self._end_link.join_link(utilities.NULL_LINK)
self._grpc_link.join_link(utilities.NULL_LINK)
if self._own_pool is not None:
self._own_pool.shutdown(wait=True)
self._own_pool = None
def _assemble(
channel, host, metadata_transformer, request_serializers,
response_deserializers, thread_pool, thread_pool_size):
response_deserializers, thread_pool, thread_pool_size, stub_creator):
end_link = _core_implementations.invocation_end_link()
grpc_link = invocation.invocation_link(
channel, host, metadata_transformer, request_serializers,
response_deserializers)
if thread_pool is None:
invocation_pool = logging_pool.pool(
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
assembly_pool = invocation_pool
else:
invocation_pool = thread_pool
assembly_pool = None
end_link.join_link(grpc_link)
grpc_link.join_link(end_link)
end_link.start()
grpc_link.start()
return end_link, grpc_link, invocation_pool, assembly_pool
def _disassemble(end_link, grpc_link, pool):
end_link.stop(24 * 60 * 60).wait()
grpc_link.stop()
end_link.join_link(utilities.NULL_LINK)
grpc_link.join_link(utilities.NULL_LINK)
if pool is not None:
pool.shutdown(wait=True)
def _wrap_assembly(stub, end_link, grpc_link, assembly_pool):
disassembly_thread = threading.Thread(
target=_disassemble, args=(end_link, grpc_link, assembly_pool))
return _AutoIntermediary(stub, disassembly_thread.start)
stub_assembly_manager = _StubAssemblyManager(
thread_pool, thread_pool_size, end_link, grpc_link, stub_creator)
stub = stub_assembly_manager.up()
return _AutoIntermediary(
stub_assembly_manager.up, stub_assembly_manager.down, stub)
def _dynamic_stub_creator(service, cardinalities):
def create_dynamic_stub(end_link, invocation_pool):
return _crust_implementations.dynamic_stub(
end_link, service, cardinalities, invocation_pool)
return create_dynamic_stub
def generic_stub(
channel, host, metadata_transformer, request_serializers,
response_deserializers, thread_pool, thread_pool_size):
end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
return _assemble(
channel, host, metadata_transformer, request_serializers,
response_deserializers, thread_pool, thread_pool_size)
stub = _crust_implementations.generic_stub(end_link, invocation_pool)
return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)
response_deserializers, thread_pool, thread_pool_size,
_crust_implementations.generic_stub)
def dynamic_stub(
channel, host, service, cardinalities, metadata_transformer,
request_serializers, response_deserializers, thread_pool,
thread_pool_size):
end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
return _assemble(
channel, host, metadata_transformer, request_serializers,
response_deserializers, thread_pool, thread_pool_size)
stub = _crust_implementations.dynamic_stub(
end_link, service, cardinalities, invocation_pool)
return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)
response_deserializers, thread_pool, thread_pool_size,
_dynamic_stub_creator(service, cardinalities))

@ -85,35 +85,6 @@ def _future_shutdown(lock, cycle, event):
return in_future
def _termination_action(lock, stats, operation_id, cycle):
"""Constructs the termination action for a single operation.
Args:
lock: A lock to hold during the termination action.
stats: A mapping from base.Outcome.Kind values to integers to increment
with the outcome kind given to the termination action.
operation_id: The operation ID for the termination action.
cycle: A _Cycle value to be updated during the termination action.
Returns:
A callable that takes an operation outcome kind as its sole parameter and
that should be used as the termination action for the operation
associated with the given operation ID.
"""
def termination_action(outcome_kind):
with lock:
stats[outcome_kind] += 1
cycle.operations.pop(operation_id, None)
if not cycle.operations:
for action in cycle.idle_actions:
cycle.pool.submit(action)
cycle.idle_actions = []
if cycle.grace:
_cancel_futures(cycle.futures)
cycle.pool.shutdown(wait=False)
return termination_action
class _End(End):
"""An End implementation."""
@ -133,6 +104,31 @@ class _End(End):
self._cycle = None
def _termination_action(self, operation_id):
"""Constructs the termination action for a single operation.
Args:
operation_id: The operation ID for the termination action.
Returns:
A callable that takes an operation outcome kind as its sole parameter and
that should be used as the termination action for the operation
associated with the given operation ID.
"""
def termination_action(outcome_kind):
with self._lock:
self._stats[outcome_kind] += 1
self._cycle.operations.pop(operation_id, None)
if not self._cycle.operations:
for action in self._cycle.idle_actions:
self._cycle.pool.submit(action)
self._cycle.idle_actions = []
if self._cycle.grace:
_cancel_futures(self._cycle.futures)
self._cycle.pool.shutdown(wait=False)
self._cycle = None
return termination_action
def start(self):
"""See base.End.start for specification."""
with self._lock:
@ -174,8 +170,7 @@ class _End(End):
with self._lock:
if self._cycle is None or self._cycle.grace:
raise ValueError('Can\'t operate on stopped or stopping End!')
termination_action = _termination_action(
self._lock, self._stats, operation_id, self._cycle)
termination_action = self._termination_action(operation_id)
operation = _operation.invocation_operate(
operation_id, group, method, subscription, timeout, protocol_options,
initial_metadata, payload, completion, self._mate.accept_ticket,
@ -208,8 +203,7 @@ class _End(End):
if operation is not None:
operation.handle_ticket(ticket)
elif self._servicer_package is not None and not self._cycle.grace:
termination_action = _termination_action(
self._lock, self._stats, ticket.operation_id, self._cycle)
termination_action = self._termination_action(ticket.operation_id)
operation = _operation.service_operate(
self._servicer_package, ticket, self._mate.accept_ticket,
termination_action, self._cycle.pool)

@ -224,5 +224,78 @@ class BetaFeaturesTest(unittest.TestCase):
self.assertEqual(_RESPONSE, response)
class ContextManagementAndLifecycleTest(unittest.TestCase):
def setUp(self):
self._servicer = _Servicer()
self._method_implementations = {
(_GROUP, _UNARY_UNARY):
utilities.unary_unary_inline(self._servicer.unary_unary),
(_GROUP, _UNARY_STREAM):
utilities.unary_stream_inline(self._servicer.unary_stream),
(_GROUP, _STREAM_UNARY):
utilities.stream_unary_inline(self._servicer.stream_unary),
(_GROUP, _STREAM_STREAM):
utilities.stream_stream_inline(self._servicer.stream_stream),
}
self._cardinalities = {
_UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
_UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
_STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
_STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
}
self._server_options = implementations.server_options(
thread_pool_size=test_constants.POOL_SIZE)
self._server_credentials = implementations.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain(),),])
self._client_credentials = implementations.ssl_client_credentials(
resources.test_root_certificates(), None, None)
self._stub_options = implementations.stub_options(
thread_pool_size=test_constants.POOL_SIZE)
def test_stub_context(self):
server = implementations.server(
self._method_implementations, options=self._server_options)
port = server.add_secure_port('[::]:0', self._server_credentials)
server.start()
channel = test_utilities.not_really_secure_channel(
'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE)
dynamic_stub = implementations.dynamic_stub(
channel, _GROUP, self._cardinalities, options=self._stub_options)
for _ in range(100):
with dynamic_stub:
pass
for _ in range(10):
with dynamic_stub:
call_options = interfaces.grpc_call_options(
disable_compression=True)
response = getattr(dynamic_stub, _UNARY_UNARY)(
_REQUEST, test_constants.LONG_TIMEOUT,
protocol_options=call_options)
self.assertEqual(_RESPONSE, response)
self.assertIsNotNone(self._servicer.peer())
server.stop(test_constants.SHORT_TIMEOUT).wait()
def test_server_lifecycle(self):
for _ in range(100):
server = implementations.server(
self._method_implementations, options=self._server_options)
port = server.add_secure_port('[::]:0', self._server_credentials)
server.start()
server.stop(test_constants.SHORT_TIMEOUT).wait()
for _ in range(100):
server = implementations.server(
self._method_implementations, options=self._server_options)
server.add_secure_port('[::]:0', self._server_credentials)
server.add_insecure_port('[::]:0')
with server:
server.stop(test_constants.SHORT_TIMEOUT)
server.stop(test_constants.SHORT_TIMEOUT)
if __name__ == '__main__':
unittest.main(verbosity=2)

Loading…
Cancel
Save