From 154e762ae8f2711cbb7097616859f5ff9c677ecf Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Sun, 30 Aug 2015 18:44:56 +0000 Subject: [PATCH 1/2] Fix gRPC links lifecycle tracking --- src/python/grpcio/grpc/_links/invocation.py | 115 ++++++++++++-------- src/python/grpcio/grpc/_links/service.py | 80 +++++++++----- 2 files changed, 123 insertions(+), 72 deletions(-) diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index ee3d72fdbc8..729b987dd12 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -41,6 +41,13 @@ from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay from grpc.framework.interfaces.links import links +_STOP = _intermediary_low.Event.Kind.STOP +_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED +_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED +_READ = _intermediary_low.Event.Kind.READ_ACCEPTED +_METADATA = _intermediary_low.Event.Kind.METADATA_ACCEPTED +_FINISH = _intermediary_low.Event.Kind.FINISH + @enum.unique class _Read(enum.Enum): @@ -67,7 +74,7 @@ class _RPCState(object): def __init__( self, call, request_serializer, response_deserializer, sequence_number, - read, allowance, high_write, low_write): + read, allowance, high_write, low_write, due): self.call = call self.request_serializer = request_serializer self.response_deserializer = response_deserializer @@ -76,6 +83,13 @@ class _RPCState(object): self.allowance = allowance self.high_write = high_write self.low_write = low_write + self.due = due + + +def _no_longer_due(kind, rpc_state, key, rpc_states): + rpc_state.due.remove(kind) + if not rpc_state.due: + del rpc_states[key] class _Kernel(object): @@ -91,12 +105,14 @@ class _Kernel(object): self._relay = ticket_relay self._completion_queue = None - self._rpc_states = None + self._rpc_states = {} self._pool = None def _on_write_event(self, operation_id, unused_event, rpc_state): if rpc_state.high_write is _HighWrite.CLOSED: rpc_state.call.complete(operation_id) + rpc_state.due.add(_COMPLETE) + rpc_state.due.remove(_WRITE) rpc_state.low_write = _LowWrite.CLOSED else: ticket = links.Ticket( @@ -105,16 +121,19 @@ class _Kernel(object): rpc_state.sequence_number += 1 self._relay.add_value(ticket) rpc_state.low_write = _LowWrite.OPEN + _no_longer_due(_WRITE, rpc_state, operation_id, self._rpc_states) def _on_read_event(self, operation_id, event, rpc_state): - if event.bytes is None: + if event.bytes is None or _FINISH not in rpc_state.due: rpc_state.read = _Read.CLOSED + _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states) else: if 0 < rpc_state.allowance: rpc_state.allowance -= 1 rpc_state.call.read(operation_id) else: rpc_state.read = _Read.AWAITING_ALLOWANCE + _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states) ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, None, None, rpc_state.response_deserializer(event.bytes), None, None, None, @@ -123,18 +142,23 @@ class _Kernel(object): self._relay.add_value(ticket) def _on_metadata_event(self, operation_id, event, rpc_state): - rpc_state.allowance -= 1 - rpc_state.call.read(operation_id) - rpc_state.read = _Read.READING - ticket = links.Ticket( - operation_id, rpc_state.sequence_number, None, None, - links.Ticket.Subscription.FULL, None, None, event.metadata, None, None, - None, None, None, None) - rpc_state.sequence_number += 1 - self._relay.add_value(ticket) + if _FINISH in rpc_state.due: + rpc_state.allowance -= 1 + rpc_state.call.read(operation_id) + rpc_state.read = _Read.READING + rpc_state.due.add(_READ) + rpc_state.due.remove(_METADATA) + ticket = links.Ticket( + operation_id, rpc_state.sequence_number, None, None, + links.Ticket.Subscription.FULL, None, None, event.metadata, None, + None, None, None, None, None) + rpc_state.sequence_number += 1 + self._relay.add_value(ticket) + else: + _no_longer_due(_METADATA, rpc_state, operation_id, self._rpc_states) def _on_finish_event(self, operation_id, event, rpc_state): - self._rpc_states.pop(operation_id, None) + _no_longer_due(_FINISH, rpc_state, operation_id, self._rpc_states) if event.status.code is _intermediary_low.Code.OK: termination = links.Ticket.Termination.COMPLETION elif event.status.code is _intermediary_low.Code.CANCELLED: @@ -155,26 +179,26 @@ class _Kernel(object): def _spin(self, completion_queue): while True: event = completion_queue.get(None) - if event.kind is _intermediary_low.Event.Kind.STOP: - return - operation_id = event.tag with self._lock: - if self._completion_queue is None: - continue - rpc_state = self._rpc_states.get(operation_id) - if rpc_state is not None: - if event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED: - self._on_write_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.METADATA_ACCEPTED: - self._on_metadata_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED: - self._on_read_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.FINISH: - self._on_finish_event(operation_id, event, rpc_state) - elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED: - pass - else: - logging.error('Illegal RPC event! %s', (event,)) + rpc_state = self._rpc_states.get(event.tag, None) + if event.kind is _STOP: + pass + elif event.kind is _WRITE: + self._on_write_event(event.tag, event, rpc_state) + elif event.kind is _METADATA: + self._on_metadata_event(event.tag, event, rpc_state) + elif event.kind is _READ: + self._on_read_event(event.tag, event, rpc_state) + elif event.kind is _FINISH: + self._on_finish_event(event.tag, event, rpc_state) + elif event.kind is _COMPLETE: + _no_longer_due(_COMPLETE, rpc_state, event.tag, self._rpc_states) + else: + logging.error('Illegal RPC event! %s', (event,)) + + if self._completion_queue is None and not self._rpc_states: + completion_queue.stop() + return def _invoke( self, operation_id, group, method, initial_metadata, payload, termination, @@ -221,26 +245,31 @@ class _Kernel(object): if high_write is _HighWrite.CLOSED: call.complete(operation_id) low_write = _LowWrite.CLOSED + due = set((_METADATA, _COMPLETE, _FINISH,)) else: low_write = _LowWrite.OPEN + due = set((_METADATA, _FINISH,)) else: call.write(request_serializer(payload), operation_id) low_write = _LowWrite.ACTIVE + due = set((_WRITE, _METADATA, _FINISH,)) self._rpc_states[operation_id] = _RPCState( call, request_serializer, response_deserializer, 0, _Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance), - high_write, low_write) + high_write, low_write, due) def _advance(self, operation_id, rpc_state, payload, termination, allowance): if payload is not None: rpc_state.call.write(rpc_state.request_serializer(payload), operation_id) rpc_state.low_write = _LowWrite.ACTIVE + rpc_state.due.add(_WRITE) if allowance is not None: if rpc_state.read is _Read.AWAITING_ALLOWANCE: rpc_state.allowance += allowance - 1 rpc_state.call.read(operation_id) rpc_state.read = _Read.READING + rpc_state.due.add(_READ) else: rpc_state.allowance += allowance @@ -248,19 +277,21 @@ class _Kernel(object): rpc_state.high_write = _HighWrite.CLOSED if rpc_state.low_write is _LowWrite.OPEN: rpc_state.call.complete(operation_id) + rpc_state.due.add(_COMPLETE) rpc_state.low_write = _LowWrite.CLOSED elif termination is not None: rpc_state.call.cancel() def add_ticket(self, ticket): with self._lock: - if self._completion_queue is None: - return if ticket.sequence_number == 0: - self._invoke( - ticket.operation_id, ticket.group, ticket.method, - ticket.initial_metadata, ticket.payload, ticket.termination, - ticket.timeout, ticket.allowance) + if self._completion_queue is None: + logging.error('Received invocation ticket %s after stop!', ticket) + else: + self._invoke( + ticket.operation_id, ticket.group, ticket.method, + ticket.initial_metadata, ticket.payload, ticket.termination, + ticket.timeout, ticket.allowance) else: rpc_state = self._rpc_states.get(ticket.operation_id) if rpc_state is not None: @@ -276,7 +307,6 @@ class _Kernel(object): """ with self._lock: self._completion_queue = _intermediary_low.CompletionQueue() - self._rpc_states = {} self._pool = logging_pool.pool(1) self._pool.submit(self._spin, self._completion_queue) @@ -288,11 +318,10 @@ class _Kernel(object): has been called. """ with self._lock: - self._completion_queue.stop() + if not self._rpc_states: + self._completion_queue.stop() self._completion_queue = None pool = self._pool - self._pool = None - self._rpc_states = None pool.shutdown(wait=True) diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index c5ecc47cd9c..bbfe9bcd55f 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -53,6 +53,13 @@ _TERMINATION_KIND_TO_CODE = { links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN, } +_STOP = _intermediary_low.Event.Kind.STOP +_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED +_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED +_SERVICE = _intermediary_low.Event.Kind.SERVICE_ACCEPTED +_READ = _intermediary_low.Event.Kind.READ_ACCEPTED +_FINISH = _intermediary_low.Event.Kind.FINISH + @enum.unique class _Read(enum.Enum): @@ -84,7 +91,7 @@ class _RPCState(object): def __init__( self, request_deserializer, response_serializer, sequence_number, read, early_read, allowance, high_write, low_write, premetadataed, - terminal_metadata, code, message): + terminal_metadata, code, message, due): self.request_deserializer = request_deserializer self.response_serializer = response_serializer self.sequence_number = sequence_number @@ -99,6 +106,13 @@ class _RPCState(object): self.terminal_metadata = terminal_metadata self.code = code self.message = message + self.due = due + + +def _no_longer_due(kind, rpc_state, key, rpc_states): + rpc_state.due.remove(kind) + if not rpc_state.due: + del rpc_states[key] def _metadatafy(call, metadata): @@ -124,6 +138,7 @@ class _Kernel(object): self._relay = ticket_relay self._completion_queue = None + self._due = set() self._server = None self._rpc_states = {} self._pool = None @@ -149,7 +164,8 @@ class _Kernel(object): call.read(call) self._rpc_states[call] = _RPCState( request_deserializer, response_serializer, 1, _Read.READING, None, 1, - _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None) + _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None, + set((_READ, _FINISH,))) ticket = links.Ticket( call, 0, group, method, links.Ticket.Subscription.FULL, service_acceptance.deadline - time.time(), None, event.metadata, None, @@ -158,14 +174,13 @@ class _Kernel(object): def _on_read_event(self, event): call = event.tag - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return + rpc_state = self._rpc_states[call] if event.bytes is None: rpc_state.read = _Read.CLOSED payload = None termination = links.Ticket.Termination.COMPLETION + _no_longer_due(_READ, rpc_state, call, self._rpc_states) else: if 0 < rpc_state.allowance: payload = rpc_state.request_deserializer(event.bytes) @@ -174,6 +189,7 @@ class _Kernel(object): call.read(call) else: rpc_state.early_read = event.bytes + _no_longer_due(_READ, rpc_state, call, self._rpc_states) return # TODO(issue 2916): Instead of returning: # rpc_state.read = _Read.AWAITING_ALLOWANCE @@ -185,9 +201,7 @@ class _Kernel(object): def _on_write_event(self, event): call = event.tag - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return + rpc_state = self._rpc_states[call] if rpc_state.high_write is _HighWrite.CLOSED: if rpc_state.terminal_metadata is not None: @@ -197,6 +211,8 @@ class _Kernel(object): rpc_state.message) call.status(status, call) rpc_state.low_write = _LowWrite.CLOSED + rpc_state.due.add(_COMPLETE) + rpc_state.due.remove(_WRITE) else: ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, 1, None, @@ -204,12 +220,12 @@ class _Kernel(object): rpc_state.sequence_number += 1 self._relay.add_value(ticket) rpc_state.low_write = _LowWrite.OPEN + _no_longer_due(_WRITE, rpc_state, call, self._rpc_states) def _on_finish_event(self, event): call = event.tag - rpc_state = self._rpc_states.pop(call, None) - if rpc_state is None: - return + rpc_state = self._rpc_states[call] + _no_longer_due(_FINISH, rpc_state, call, self._rpc_states) code = event.status.code if code is _intermediary_low.Code.OK: return @@ -229,28 +245,33 @@ class _Kernel(object): def _spin(self, completion_queue, server): while True: event = completion_queue.get(None) - if event.kind is _intermediary_low.Event.Kind.STOP: - return with self._lock: - if self._server is None: - continue - elif event.kind is _intermediary_low.Event.Kind.SERVICE_ACCEPTED: - self._on_service_acceptance_event(event, server) - elif event.kind is _intermediary_low.Event.Kind.READ_ACCEPTED: + if event.kind is _STOP: + self._due.remove(_STOP) + elif event.kind is _READ: self._on_read_event(event) - elif event.kind is _intermediary_low.Event.Kind.WRITE_ACCEPTED: + elif event.kind is _WRITE: self._on_write_event(event) - elif event.kind is _intermediary_low.Event.Kind.COMPLETE_ACCEPTED: - pass + elif event.kind is _COMPLETE: + _no_longer_due( + _COMPLETE, self._rpc_states.get(event.tag), event.tag, + self._rpc_states) elif event.kind is _intermediary_low.Event.Kind.FINISH: self._on_finish_event(event) + elif event.kind is _SERVICE: + if self._server is None: + self._due.remove(_SERVICE) + else: + self._on_service_acceptance_event(event, server) else: logging.error('Illegal event! %s', (event,)) + if not self._due and not self._rpc_states: + completion_queue.stop() + return + def add_ticket(self, ticket): with self._lock: - if self._server is None: - return call = ticket.operation_id rpc_state = self._rpc_states.get(call) if rpc_state is None: @@ -278,6 +299,7 @@ class _Kernel(object): rpc_state.early_read = None if rpc_state.read is _Read.READING: call.read(call) + rpc_state.due.add(_READ) termination = None else: termination = links.Ticket.Termination.COMPLETION @@ -289,6 +311,7 @@ class _Kernel(object): if ticket.payload is not None: call.write(rpc_state.response_serializer(ticket.payload), call) + rpc_state.due.add(_WRITE) rpc_state.low_write = _LowWrite.ACTIVE if ticket.terminal_metadata is not None: @@ -307,6 +330,7 @@ class _Kernel(object): links.Ticket.Termination.COMPLETION, rpc_state.code, rpc_state.message) call.status(status, call) + rpc_state.due.add(_COMPLETE) rpc_state.low_write = _LowWrite.CLOSED elif ticket.termination is not None: if rpc_state.terminal_metadata is not None: @@ -314,7 +338,7 @@ class _Kernel(object): status = _status( ticket.termination, rpc_state.code, rpc_state.message) call.status(status, call) - self._rpc_states.pop(call, None) + rpc_state.due.add(_COMPLETE) def add_port(self, address, server_credentials): with self._lock: @@ -335,19 +359,17 @@ class _Kernel(object): self._pool.submit(self._spin, self._completion_queue, self._server) self._server.start() self._server.service(None) + self._due.add(_SERVICE) def begin_stop(self): with self._lock: self._server.stop() + self._due.add(_STOP) self._server = None def end_stop(self): with self._lock: - self._completion_queue.stop() - self._completion_queue = None pool = self._pool - self._pool = None - self._rpc_states = None pool.shutdown(wait=True) @@ -369,7 +391,7 @@ class ServiceLink(links.Link): None for insecure service. Returns: - A integer port on which RPCs will be serviced after this link has been + An integer port on which RPCs will be serviced after this link has been started. This is typically the same number as the port number contained in the passed address, but will likely be different if the port number contained in the passed address was zero. From dff73c7d23fe6fa405b7311696e8f3c4b8484e8e Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Sun, 30 Aug 2015 20:09:33 +0000 Subject: [PATCH 2/2] Drop invoker during test tear-down The invoker is an object derived from, and referring to, objects of the Face implementation under test. If those objects are to be garbage collected at the appropriate time the invoker that references them must be made eligible for garbage collection in the test's tearDown method. --- .../interfaces/face/_blocking_invocation_inline_service.py | 1 + .../face/_event_invocation_synchronous_event_service.py | 1 + .../face/_future_invocation_asynchronous_event_service.py | 1 + 3 files changed, 3 insertions(+) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py index 8804f3f2233..b7dd5d4d171 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -73,6 +73,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): Overriding implementations must call this implementation. """ + self._invoker = None self.implementation.destantiate(self._memo) def testSuccessfulUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py index 5a78b4bed24..7cb273bf787 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py @@ -74,6 +74,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): Overriding implementations must call this implementation. """ + self._invoker = None self.implementation.destantiate(self._memo) def testSuccessfulUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index d1107e1576d..272a37f15f8 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -103,6 +103,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): Overriding implementations must call this implementation. """ + self._invoker = None self.implementation.destantiate(self._memo) self._digest_pool.shutdown(wait=True)