Make ServiceLink shut-down a two step process

pull/3126/head
Nathaniel Manista 9 years ago
parent 400e0cdd32
commit 4354f3e680
  1. 33
      src/python/grpcio/grpc/_links/service.py
  2. 3
      src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
  3. 3
      src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
  4. 9
      src/python/grpcio_test/grpc_test/_links/_transmission_test.py

@ -337,10 +337,13 @@ class _Kernel(object):
self._server.start() self._server.start()
self._server.service(None) self._server.service(None)
def graceful_stop(self): def begin_stop(self):
with self._lock: with self._lock:
self._server.stop() self._server.stop()
self._server = None self._server = None
def end_stop(self):
with self._lock:
self._completion_queue.stop() self._completion_queue.stop()
self._completion_queue = None self._completion_queue = None
pool = self._pool pool = self._pool
@ -348,11 +351,6 @@ class _Kernel(object):
self._rpc_states = None self._rpc_states = None
pool.shutdown(wait=True) pool.shutdown(wait=True)
def immediate_stop(self):
# TODO(nathaniel): Implementation.
raise NotImplementedError(
'TODO(nathaniel): after merge of rewritten lower layers')
class ServiceLink(links.Link): class ServiceLink(links.Link):
"""A links.Link for use on the service-side of a gRPC connection. """A links.Link for use on the service-side of a gRPC connection.
@ -386,18 +384,20 @@ class ServiceLink(links.Link):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def stop_gracefully(self): def begin_stop(self):
"""Stops this link. """Indicate imminent link stop and immediate rejection of new RPCs.
New RPCs will be rejected as soon as this method is called, but ongoing RPCs New RPCs will be rejected as soon as this method is called, but ongoing RPCs
will be allowed to continue until they terminate. This method blocks until will be allowed to continue until they terminate. This method does not
all RPCs have terminated. block.
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def stop_immediately(self): def end_stop(self):
"""Stops this link. """Finishes stopping this link.
begin_stop must have been called exactly once before calling this method.
All in-progress RPCs will be terminated immediately. All in-progress RPCs will be terminated immediately.
""" """
@ -424,12 +424,11 @@ class _ServiceLink(ServiceLink):
self._relay.start() self._relay.start()
return self._kernel.start() return self._kernel.start()
def stop_gracefully(self): def begin_stop(self):
self._kernel.graceful_stop() self._kernel.begin_stop()
self._relay.stop()
def stop_immediately(self): def end_stop(self):
self._kernel.immediate_stop() self._kernel.end_stop()
self._relay.stop() self._relay.stop()

@ -114,7 +114,8 @@ class _Implementation(test_interfaces.Implementation):
def destantiate(self, memo): def destantiate(self, memo):
invocation_grpc_link, service_grpc_link = memo invocation_grpc_link, service_grpc_link = memo
invocation_grpc_link.stop() invocation_grpc_link.stop()
service_grpc_link.stop_gracefully() service_grpc_link.begin_stop()
service_grpc_link.end_stop()
def invocation_initial_metadata(self): def invocation_initial_metadata(self):
return _INVOCATION_INITIAL_METADATA return _INVOCATION_INITIAL_METADATA

@ -121,8 +121,9 @@ class _Implementation(test_interfaces.Implementation):
service_end_link, pool) = memo service_end_link, pool) = memo
invocation_end_link.stop(0).wait() invocation_end_link.stop(0).wait()
invocation_grpc_link.stop() invocation_grpc_link.stop()
service_grpc_link.stop_gracefully() service_grpc_link.begin_stop()
service_end_link.stop(0).wait() service_end_link.stop(0).wait()
service_grpc_link.end_stop()
invocation_end_link.join_link(utilities.NULL_LINK) invocation_end_link.join_link(utilities.NULL_LINK)
invocation_grpc_link.join_link(utilities.NULL_LINK) invocation_grpc_link.join_link(utilities.NULL_LINK)
service_grpc_link.join_link(utilities.NULL_LINK) service_grpc_link.join_link(utilities.NULL_LINK)

@ -62,7 +62,8 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
def destroy_transmitting_links(self, invocation_side_link, service_side_link): def destroy_transmitting_links(self, invocation_side_link, service_side_link):
invocation_side_link.stop() invocation_side_link.stop()
service_side_link.stop_gracefully() service_side_link.begin_stop()
service_side_link.end_stop()
def create_invocation_initial_metadata(self): def create_invocation_initial_metadata(self):
return ( return (
@ -140,7 +141,8 @@ class RoundTripTest(unittest.TestCase):
invocation_mate.block_until_tickets_satisfy(test_cases.terminated) invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop() invocation_link.stop()
service_link.stop_gracefully() service_link.begin_stop()
service_link.end_stop()
self.assertIs( self.assertIs(
service_mate.tickets()[-1].termination, service_mate.tickets()[-1].termination,
@ -206,7 +208,8 @@ class RoundTripTest(unittest.TestCase):
invocation_mate.block_until_tickets_satisfy(test_cases.terminated) invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
invocation_link.stop() invocation_link.stop()
service_link.stop_gracefully() service_link.begin_stop()
service_link.end_stop()
observed_requests = tuple( observed_requests = tuple(
ticket.payload for ticket in service_mate.tickets() ticket.payload for ticket in service_mate.tickets()

Loading…
Cancel
Save