Merge pull request #657 from nathanielmanistaatgoogle/thread-leak-fix

pull/661/head
Nathaniel Manista 10 years ago
commit b472e2364a
  1. 2
      src/python/src/grpc/framework/base/packets/_ends.py
  2. 6
      src/python/src/grpc/framework/base/packets/_interfaces.py
  3. 5
      src/python/src/grpc/framework/base/packets/_termination.py
  4. 9
      src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py

@ -215,6 +215,7 @@ def _front_operate(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
@ -340,6 +341,7 @@ def _back_operate(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(

@ -41,6 +41,11 @@ class TerminationManager(object):
"""An object responsible for handling the termination of an operation."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate."""
raise NotImplementedError()
@abc.abstractmethod
def is_active(self):
"""Reports whether or not the operation is active.
@ -169,6 +174,7 @@ class IngestionManager(stream.Consumer):
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate."""
raise NotImplementedError()
@abc.abstractmethod
def start(self, requirement):

@ -86,11 +86,15 @@ class _TerminationManager(_interfaces.TerminationManager):
self._action = action
self._local_failure = local_failure
self._has_locally_failed = False
self._expiration_manager = None
self._outstanding_requirements = set(requirements)
self._kind = None
self._callbacks = []
def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager
def _terminate(self, kind):
"""Terminates the operation.
@ -100,6 +104,7 @@ class _TerminationManager(_interfaces.TerminationManager):
packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
packets.Kind.SERVICED_FAILURE.
"""
self._expiration_manager.abort()
self._outstanding_requirements = None
callbacks = list(self._callbacks)
self._callbacks = None

@ -41,6 +41,7 @@ from grpc.framework.face.testing import stock_service
from grpc.framework.face.testing import test_case
_TIMEOUT = 3
_LONG_TIMEOUT = 45
class BlockingInvocationInlineServiceTestCase(
@ -82,7 +83,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
name, request, _TIMEOUT)
name, request, _LONG_TIMEOUT)
test_messages.verify(request, response, self)
@ -93,7 +94,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT)
name, request, _LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@ -105,7 +106,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), _LONG_TIMEOUT)
test_messages.verify(requests, response, self)
@ -116,7 +117,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT)
name, iter(requests), _LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)

Loading…
Cancel
Save