Merge pull request #521 from nathanielmanistaatgoogle/future

Change the interface of RPC Framework's Future interface.
pull/522/head
soltanmm 10 years ago
commit 3793267ede
  1. 185
      src/python/src/_framework/face/_calls.py
  2. 72
      src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
  3. 62
      src/python/src/_framework/foundation/_later_test.py
  4. 116
      src/python/src/_framework/foundation/_timer_future.py
  5. 39
      src/python/src/_framework/foundation/callable_util.py
  6. 222
      src/python/src/_framework/foundation/future.py

@ -29,6 +29,7 @@
"""Utility functions for invoking RPCs.""" """Utility functions for invoking RPCs."""
import sys
import threading import threading
from _framework.base import interfaces as base_interfaces from _framework.base import interfaces as base_interfaces
@ -79,20 +80,46 @@ def _stream_event_subscription(result_consumer, abortion_callback):
_EventServicedIngestor(result_consumer, abortion_callback)) _EventServicedIngestor(result_consumer, abortion_callback))
# NOTE(nathaniel): This class has some extremely special semantics around
# cancellation that allow it to be used by both "blocking" APIs and "futures"
# APIs.
#
# Since futures.Future defines its own exception for cancellation, we want these
# objects, when returned by methods of a returning-Futures-from-other-methods
# object, to raise the same exception for cancellation. But that's weird in a
# blocking API - why should this object, also returned by methods of blocking
# APIs, raise exceptions from the "future" module? Should we do something like
# have this class be parameterized by the type of exception that it raises in
# cancellation circumstances?
#
# We don't have to take such a dramatic step: since blocking APIs define no
# cancellation semantics whatsoever, there is no supported way for
# blocking-API-users of these objects to cancel RPCs, and thus no supported way
# for them to see an exception the type of which would be weird to them.
#
# Bonus: in both blocking and futures APIs, this object still properly raises
# exceptions.CancellationError for any *server-side cancellation* of an RPC.
class _OperationCancellableIterator(interfaces.CancellableIterator): class _OperationCancellableIterator(interfaces.CancellableIterator):
"""An interfaces.CancellableIterator for response-streaming operations.""" """An interfaces.CancellableIterator for response-streaming operations."""
def __init__(self, rendezvous, operation): def __init__(self, rendezvous, operation):
self._lock = threading.Lock()
self._rendezvous = rendezvous self._rendezvous = rendezvous
self._operation = operation self._operation = operation
self._cancelled = False
def __iter__(self): def __iter__(self):
return self return self
def next(self): def next(self):
with self._lock:
if self._cancelled:
raise future.CancelledError()
return next(self._rendezvous) return next(self._rendezvous)
def cancel(self): def cancel(self):
with self._lock:
self._cancelled = True
self._operation.cancel() self._operation.cancel()
self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED) self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED)
@ -105,46 +132,126 @@ class _OperationFuture(future.Future):
self._rendezvous = rendezvous self._rendezvous = rendezvous
self._operation = operation self._operation = operation
self._outcome = None self._cancelled = False
self._computed = False
self._payload = None
self._exception = None
self._traceback = None
self._callbacks = [] self._callbacks = []
def cancel(self): def cancel(self):
"""See future.Future.cancel for specification.""" """See future.Future.cancel for specification."""
with self._condition: with self._condition:
if self._outcome is None: if not self._cancelled and not self._computed:
self._operation.cancel() self._operation.cancel()
self._outcome = future.aborted() self._cancelled = True
self._condition.notify_all() self._condition.notify_all()
return False return False
def cancelled(self): def cancelled(self):
"""See future.Future.cancelled for specification.""" """See future.Future.cancelled for specification."""
return False with self._condition:
return self._cancelled
def running(self):
"""See future.Future.running for specification."""
with self._condition:
return not self._cancelled and not self._computed
def done(self): def done(self):
"""See future.Future.done for specification.""" """See future.Future.done for specification."""
with self._condition: with self._condition:
return (self._outcome is not None and return self._cancelled or self._computed
self._outcome.category is not future.ABORTED)
def outcome(self): def result(self, timeout=None):
"""See future.Future.outcome for specification.""" """See future.Future.result for specification."""
with self._condition: with self._condition:
while self._outcome is None: if self._cancelled:
self._condition.wait() raise future.CancelledError()
return self._outcome if self._computed:
if self._payload is None:
raise self._exception # pylint: disable=raising-bad-type
else:
return self._payload
condition = threading.Condition()
def notify_condition(unused_future):
with condition:
condition.notify()
self._callbacks.append(notify_condition)
with condition:
condition.wait(timeout=timeout)
with self._condition:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
if self._payload is None:
raise self._exception # pylint: disable=raising-bad-type
else:
return self._payload
else:
raise future.TimeoutError()
def exception(self, timeout=None):
"""See future.Future.exception for specification."""
with self._condition:
if self._cancelled:
raise future.CancelledError()
if self._computed:
return self._exception
condition = threading.Condition()
def notify_condition(unused_future):
with condition:
condition.notify()
self._callbacks.append(notify_condition)
with condition:
condition.wait(timeout=timeout)
with self._condition:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
return self._exception
else:
raise future.TimeoutError()
def add_done_callback(self, callback): def traceback(self, timeout=None):
"""See future.Future.traceback for specification."""
with self._condition:
if self._cancelled:
raise future.CancelledError()
if self._computed:
return self._traceback
condition = threading.Condition()
def notify_condition(unused_future):
with condition:
condition.notify()
self._callbacks.append(notify_condition)
with condition:
condition.wait(timeout=timeout)
with self._condition:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
return self._traceback
else:
raise future.TimeoutError()
def add_done_callback(self, fn):
"""See future.Future.add_done_callback for specification.""" """See future.Future.add_done_callback for specification."""
with self._condition: with self._condition:
if self._callbacks is not None: if self._callbacks is not None:
self._callbacks.add(callback) self._callbacks.add(fn)
return return
outcome = self._outcome callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self)
callable_util.call_logging_exceptions(
callback, _DONE_CALLBACK_LOG_MESSAGE, outcome)
def on_operation_termination(self, operation_outcome): def on_operation_termination(self, operation_outcome):
"""Indicates to this object that the operation has terminated. """Indicates to this object that the operation has terminated.
@ -154,34 +261,42 @@ class _OperationFuture(future.Future):
outcome of the operation. outcome of the operation.
""" """
with self._condition: with self._condition:
if (self._outcome is None and cancelled = self._cancelled
operation_outcome is not base_interfaces.Outcome.COMPLETED): if cancelled:
self._outcome = future.raised(
_control.abortion_outcome_to_exception(operation_outcome))
self._condition.notify_all()
outcome = self._outcome
rendezvous = self._rendezvous
callbacks = list(self._callbacks) callbacks = list(self._callbacks)
self._callbacks = None self._callbacks = None
else:
rendezvous = self._rendezvous
if outcome is None: if not cancelled:
payload = None
exception = None
traceback = None
if operation_outcome == base_interfaces.Outcome.COMPLETED:
try: try:
return_value = next(rendezvous) payload = next(rendezvous)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
outcome = future.raised(e) exception = e
traceback = sys.exc_info()[2]
else: else:
outcome = future.returned(return_value) try:
# We raise and then immediately catch in order to create a traceback.
raise _control.abortion_outcome_to_exception(operation_outcome)
except Exception as e: # pylint: disable=broad-except
exception = e
traceback = sys.exc_info()[2]
with self._condition: with self._condition:
if self._outcome is None: if not self._cancelled:
self._outcome = outcome self._computed = True
self._condition.notify_all() self._payload = payload
else: self._exception = exception
outcome = self._outcome self._traceback = traceback
callbacks = list(self._callbacks)
self._callbacks = None
for callback in callbacks: for callback in callbacks:
callable_util.call_logging_exceptions( callable_util.call_logging_exceptions(
callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) callback, _DONE_CALLBACK_LOG_MESSAGE, self)
class _Call(interfaces.Call): class _Call(interfaces.Call):

@ -116,7 +116,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
response_future = self.stub.future_value_in_value_out( response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT) name, request, _TIMEOUT)
response = response_future.outcome().return_value response = response_future.result()
test_messages.verify(request, response, self) test_messages.verify(request, response, self)
@ -144,7 +144,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with request_iterator.pause(): with request_iterator.pause():
response_future = self.stub.future_stream_in_value_out( response_future = self.stub.future_stream_in_value_out(
name, request_iterator, _TIMEOUT) name, request_iterator, _TIMEOUT)
response = response_future.outcome().return_value response = response_future.result()
test_messages.verify(requests, response, self) test_messages.verify(requests, response, self)
@ -173,13 +173,13 @@ class FutureInvocationAsynchronousEventServiceTestCase(
first_response_future = self.stub.future_value_in_value_out( first_response_future = self.stub.future_value_in_value_out(
name, first_request, _TIMEOUT) name, first_request, _TIMEOUT)
first_response = first_response_future.outcome().return_value first_response = first_response_future.result()
test_messages.verify(first_request, first_response, self) test_messages.verify(first_request, first_response, self)
second_response_future = self.stub.future_value_in_value_out( second_response_future = self.stub.future_value_in_value_out(
name, second_request, _TIMEOUT) name, second_request, _TIMEOUT)
second_response = second_response_future.outcome().return_value second_response = second_response_future.result()
test_messages.verify(second_request, second_response, self) test_messages.verify(second_request, second_response, self)
@ -192,10 +192,10 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause(): with self.control.pause():
response_future = self.stub.future_value_in_value_out( response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT) name, request, _TIMEOUT)
outcome = response_future.outcome()
self.assertIsInstance( self.assertIsInstance(
outcome.exception, exceptions.ExpirationError) response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
def testExpiredUnaryRequestStreamResponse(self): def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -203,10 +203,10 @@ class FutureInvocationAsynchronousEventServiceTestCase(
for test_messages in test_messages_sequence: for test_messages in test_messages_sequence:
request = test_messages.request() request = test_messages.request()
with self.control.pause(), self.assertRaises( with self.control.pause():
exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out( response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT) name, request, _TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator) list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self): def testExpiredStreamRequestUnaryResponse(self):
@ -218,10 +218,10 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause(): with self.control.pause():
response_future = self.stub.future_stream_in_value_out( response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT) name, iter(requests), _TIMEOUT)
outcome = response_future.outcome()
self.assertIsInstance( self.assertIsInstance(
outcome.exception, exceptions.ExpirationError) response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
def testExpiredStreamRequestStreamResponse(self): def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -229,10 +229,10 @@ class FutureInvocationAsynchronousEventServiceTestCase(
for test_messages in test_messages_sequence: for test_messages in test_messages_sequence:
requests = test_messages.requests() requests = test_messages.requests()
with self.control.pause(), self.assertRaises( with self.control.pause():
exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out( response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT) name, iter(requests), _TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator) list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self): def testFailedUnaryRequestUnaryResponse(self):
@ -244,13 +244,15 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail(): with self.control.fail():
response_future = self.stub.future_value_in_value_out( response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT) name, request, _TIMEOUT)
outcome = response_future.outcome()
# Because the servicer fails outside of the thread from which the # Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable # servicer-side runtime called into it its failure is
# from simply not having called its response_callback before the # indistinguishable from simply not having called its
# expiration of the RPC. # response_callback before the expiration of the RPC.
self.assertIsInstance(outcome.exception, exceptions.ExpirationError) self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
def testFailedUnaryRequestStreamResponse(self): def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -276,13 +278,15 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail(): with self.control.fail():
response_future = self.stub.future_stream_in_value_out( response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT) name, iter(requests), _TIMEOUT)
outcome = response_future.outcome()
# Because the servicer fails outside of the thread from which the # Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is indistinguishable # servicer-side runtime called into it its failure is
# from simply not having called its response_callback before the # indistinguishable from simply not having called its
# expiration of the RPC. # response_callback before the expiration of the RPC.
self.assertIsInstance(outcome.exception, exceptions.ExpirationError) self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
def testFailedStreamRequestStreamResponse(self): def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -310,8 +314,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
name, first_request, _TIMEOUT) name, first_request, _TIMEOUT)
second_response_future = self.stub.future_value_in_value_out( second_response_future = self.stub.future_value_in_value_out(
name, second_request, _TIMEOUT) name, second_request, _TIMEOUT)
first_response = first_response_future.outcome().return_value first_response = first_response_future.result()
second_response = second_response_future.outcome().return_value second_response = second_response_future.result()
test_messages.verify(first_request, first_response, self) test_messages.verify(first_request, first_response, self)
test_messages.verify(second_request, second_response, self) test_messages.verify(second_request, second_response, self)
@ -329,10 +333,10 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause(): with self.control.pause():
response_future = self.stub.future_value_in_value_out( response_future = self.stub.future_value_in_value_out(
name, request, _TIMEOUT) name, request, _TIMEOUT)
cancelled = response_future.cancel() cancel_method_return_value = response_future.cancel()
self.assertFalse(cancelled) self.assertFalse(cancel_method_return_value)
self.assertEqual(future.ABORTED, response_future.outcome().category) self.assertTrue(response_future.cancelled())
def testCancelledUnaryRequestStreamResponse(self): def testCancelledUnaryRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -345,7 +349,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
name, request, _TIMEOUT) name, request, _TIMEOUT)
response_iterator.cancel() response_iterator.cancel()
with self.assertRaises(exceptions.CancellationError): with self.assertRaises(future.CancelledError):
next(response_iterator) next(response_iterator)
def testCancelledStreamRequestUnaryResponse(self): def testCancelledStreamRequestUnaryResponse(self):
@ -357,10 +361,10 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause(): with self.control.pause():
response_future = self.stub.future_stream_in_value_out( response_future = self.stub.future_stream_in_value_out(
name, iter(requests), _TIMEOUT) name, iter(requests), _TIMEOUT)
cancelled = response_future.cancel() cancel_method_return_value = response_future.cancel()
self.assertFalse(cancelled) self.assertFalse(cancel_method_return_value)
self.assertEqual(future.ABORTED, response_future.outcome().category) self.assertTrue(response_future.cancelled())
def testCancelledStreamRequestStreamResponse(self): def testCancelledStreamRequestStreamResponse(self):
for name, test_messages_sequence in ( for name, test_messages_sequence in (
@ -373,5 +377,5 @@ class FutureInvocationAsynchronousEventServiceTestCase(
name, iter(requests), _TIMEOUT) name, iter(requests), _TIMEOUT)
response_iterator.cancel() response_iterator.cancel()
with self.assertRaises(exceptions.CancellationError): with self.assertRaises(future.CancelledError):
next(response_iterator) next(response_iterator)

@ -33,7 +33,6 @@ import threading
import time import time
import unittest import unittest
from _framework.foundation import future
from _framework.foundation import later from _framework.foundation import later
TICK = 0.1 TICK = 0.1
@ -44,10 +43,14 @@ class LaterTest(unittest.TestCase):
def test_simple_delay(self): def test_simple_delay(self):
lock = threading.Lock() lock = threading.Lock()
cell = [0] cell = [0]
def increment_cell(): return_value = object()
def computation():
with lock: with lock:
cell[0] += 1 cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell) return return_value
computation_future = later.later(TICK * 2, computation)
self.assertFalse(computation_future.done()) self.assertFalse(computation_future.done())
self.assertFalse(computation_future.cancelled()) self.assertFalse(computation_future.cancelled())
time.sleep(TICK) time.sleep(TICK)
@ -60,22 +63,21 @@ class LaterTest(unittest.TestCase):
self.assertFalse(computation_future.cancelled()) self.assertFalse(computation_future.cancelled())
with lock: with lock:
self.assertEqual(1, cell[0]) self.assertEqual(1, cell[0])
outcome = computation_future.outcome() self.assertEqual(return_value, computation_future.result())
self.assertEqual(future.RETURNED, outcome.category)
def test_callback(self): def test_callback(self):
lock = threading.Lock() lock = threading.Lock()
cell = [0] cell = [0]
callback_called = [False] callback_called = [False]
outcome_passed_to_callback = [None] future_passed_to_callback = [None]
def increment_cell(): def computation():
with lock: with lock:
cell[0] += 1 cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell) computation_future = later.later(TICK * 2, computation)
def callback(outcome): def callback(outcome):
with lock: with lock:
callback_called[0] = True callback_called[0] = True
outcome_passed_to_callback[0] = outcome future_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback) computation_future.add_done_callback(callback)
time.sleep(TICK) time.sleep(TICK)
with lock: with lock:
@ -83,63 +85,67 @@ class LaterTest(unittest.TestCase):
time.sleep(TICK * 2) time.sleep(TICK * 2)
with lock: with lock:
self.assertTrue(callback_called[0]) self.assertTrue(callback_called[0])
self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) self.assertTrue(future_passed_to_callback[0].done())
callback_called[0] = False callback_called[0] = False
outcome_passed_to_callback[0] = None future_passed_to_callback[0] = None
computation_future.add_done_callback(callback) computation_future.add_done_callback(callback)
with lock: with lock:
self.assertTrue(callback_called[0]) self.assertTrue(callback_called[0])
self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) self.assertTrue(future_passed_to_callback[0].done())
def test_cancel(self): def test_cancel(self):
lock = threading.Lock() lock = threading.Lock()
cell = [0] cell = [0]
callback_called = [False] callback_called = [False]
outcome_passed_to_callback = [None] future_passed_to_callback = [None]
def increment_cell(): def computation():
with lock: with lock:
cell[0] += 1 cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell) computation_future = later.later(TICK * 2, computation)
def callback(outcome): def callback(outcome):
with lock: with lock:
callback_called[0] = True callback_called[0] = True
outcome_passed_to_callback[0] = outcome future_passed_to_callback[0] = outcome
computation_future.add_done_callback(callback) computation_future.add_done_callback(callback)
time.sleep(TICK) time.sleep(TICK)
with lock: with lock:
self.assertFalse(callback_called[0]) self.assertFalse(callback_called[0])
computation_future.cancel() computation_future.cancel()
self.assertTrue(computation_future.cancelled()) self.assertTrue(computation_future.cancelled())
self.assertFalse(computation_future.done()) self.assertFalse(computation_future.running())
self.assertEqual(future.ABORTED, computation_future.outcome().category) self.assertTrue(computation_future.done())
with lock: with lock:
self.assertTrue(callback_called[0]) self.assertTrue(callback_called[0])
self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category) self.assertTrue(future_passed_to_callback[0].cancelled())
def test_outcome(self): def test_result(self):
lock = threading.Lock() lock = threading.Lock()
cell = [0] cell = [0]
callback_called = [False] callback_called = [False]
outcome_passed_to_callback = [None] future_passed_to_callback_cell = [None]
def increment_cell(): return_value = object()
def computation():
with lock: with lock:
cell[0] += 1 cell[0] += 1
computation_future = later.later(TICK * 2, increment_cell) return return_value
def callback(outcome): computation_future = later.later(TICK * 2, computation)
def callback(future_passed_to_callback):
with lock: with lock:
callback_called[0] = True callback_called[0] = True
outcome_passed_to_callback[0] = outcome future_passed_to_callback_cell[0] = future_passed_to_callback
computation_future.add_done_callback(callback) computation_future.add_done_callback(callback)
returned_outcome = computation_future.outcome() returned_value = computation_future.result()
self.assertEqual(future.RETURNED, returned_outcome.category) self.assertEqual(return_value, returned_value)
# The callback may not yet have been called! Sleep a tick. # The callback may not yet have been called! Sleep a tick.
time.sleep(TICK) time.sleep(TICK)
with lock: with lock:
self.assertTrue(callback_called[0]) self.assertTrue(callback_called[0])
self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) self.assertEqual(return_value, future_passed_to_callback_cell[0].result())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

@ -29,6 +29,7 @@
"""Affords a Future implementation based on Python's threading.Timer.""" """Affords a Future implementation based on Python's threading.Timer."""
import sys
import threading import threading
import time import time
@ -52,7 +53,9 @@ class TimerFuture(future.Future):
self._computing = False self._computing = False
self._computed = False self._computed = False
self._cancelled = False self._cancelled = False
self._outcome = None self._result = None
self._exception = None
self._traceback = None
self._waiting = [] self._waiting = []
def _compute(self): def _compute(self):
@ -70,19 +73,24 @@ class TimerFuture(future.Future):
self._computing = True self._computing = True
try: try:
returned_value = self._computation() return_value = self._computation()
outcome = future.returned(returned_value) exception = None
traceback = None
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
outcome = future.raised(e) return_value = None
exception = e
traceback = sys.exc_info()[2]
with self._lock: with self._lock:
self._computing = False self._computing = False
self._computed = True self._computed = True
self._outcome = outcome self._return_value = return_value
self._exception = exception
self._traceback = traceback
waiting = self._waiting waiting = self._waiting
for callback in waiting: for callback in waiting:
callback(outcome) callback(self)
def start(self): def start(self):
"""Starts this Future. """Starts this Future.
@ -104,13 +112,11 @@ class TimerFuture(future.Future):
else: else:
self._timer.cancel() self._timer.cancel()
self._cancelled = True self._cancelled = True
self._outcome = future.aborted()
outcome = self._outcome
waiting = self._waiting waiting = self._waiting
for callback in waiting: for callback in waiting:
try: try:
callback(outcome) callback(self)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
pass pass
@ -121,36 +127,102 @@ class TimerFuture(future.Future):
with self._lock: with self._lock:
return self._cancelled return self._cancelled
def running(self):
"""See future.Future.running for specification."""
with self._lock:
return not self._computed and not self._cancelled
def done(self): def done(self):
"""See future.Future.done for specification.""" """See future.Future.done for specification."""
with self._lock: with self._lock:
return self._computed return self._computed or self._cancelled
def result(self, timeout=None):
"""See future.Future.result for specification."""
with self._lock:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
if self._exception is None:
return self._return_value
else:
raise self._exception # pylint: disable=raising-bad-type
condition = threading.Condition()
def notify_condition(unused_future):
with condition:
condition.notify()
self._waiting.append(notify_condition)
with condition:
condition.wait(timeout=timeout)
with self._lock:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
if self._exception is None:
return self._return_value
else:
raise self._exception # pylint: disable=raising-bad-type
else:
raise future.TimeoutError()
def exception(self, timeout=None):
"""See future.Future.exception for specification."""
with self._lock:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
return self._exception
condition = threading.Condition()
def notify_condition(unused_future):
with condition:
condition.notify()
self._waiting.append(notify_condition)
with condition:
condition.wait(timeout=timeout)
def outcome(self):
"""See future.Future.outcome for specification."""
with self._lock: with self._lock:
if self._computed or self._cancelled: if self._cancelled:
return self._outcome raise future.CancelledError()
elif self._computed:
return self._exception
else:
raise future.TimeoutError()
def traceback(self, timeout=None):
"""See future.Future.traceback for specification."""
with self._lock:
if self._cancelled:
raise future.CancelledError()
elif self._computed:
return self._traceback
condition = threading.Condition() condition = threading.Condition()
def notify_condition(unused_outcome): def notify_condition(unused_future):
with condition: with condition:
condition.notify() condition.notify()
self._waiting.append(notify_condition) self._waiting.append(notify_condition)
with condition: with condition:
condition.wait() condition.wait(timeout=timeout)
with self._lock: with self._lock:
return self._outcome if self._cancelled:
raise future.CancelledError()
elif self._computed:
return self._traceback
else:
raise future.TimeoutError()
def add_done_callback(self, callback): def add_done_callback(self, fn):
"""See future.Future.add_done_callback for specification.""" """See future.Future.add_done_callback for specification."""
with self._lock: with self._lock:
if not self._computed and not self._cancelled: if not self._computed and not self._cancelled:
self._waiting.append(callback) self._waiting.append(fn)
return return
else:
outcome = self._outcome
callback(outcome) fn(self)

@ -29,18 +29,47 @@
"""Utilities for working with callables.""" """Utilities for working with callables."""
import abc
import collections
import enum
import functools import functools
import logging import logging
from _framework.foundation import future
class Outcome(object):
"""A sum type describing the outcome of some call.
Attributes:
kind: One of Kind.RETURNED or Kind.RAISED respectively indicating that the
call returned a value or raised an exception.
return_value: The value returned by the call. Must be present if kind is
Kind.RETURNED.
exception: The exception raised by the call. Must be present if kind is
Kind.RAISED.
"""
__metaclass__ = abc.ABCMeta
@enum.unique
class Kind(enum.Enum):
"""Identifies the general kind of the outcome of some call."""
RETURNED = object()
RAISED = object()
class _EasyOutcome(
collections.namedtuple(
'_EasyOutcome', ['kind', 'return_value', 'exception']),
Outcome):
"""A trivial implementation of Outcome."""
def _call_logging_exceptions(behavior, message, *args, **kwargs): def _call_logging_exceptions(behavior, message, *args, **kwargs):
try: try:
return future.returned(behavior(*args, **kwargs)) return _EasyOutcome(Outcome.Kind.RETURNED, behavior(*args, **kwargs), None)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
logging.exception(message) logging.exception(message)
return future.raised(e) return _EasyOutcome(Outcome.Kind.RAISED, None, e)
def with_exceptions_logged(behavior, message): def with_exceptions_logged(behavior, message):
@ -72,7 +101,7 @@ def call_logging_exceptions(behavior, message, *args, **kwargs):
**kwargs: Keyword arguments to pass to the given behavior. **kwargs: Keyword arguments to pass to the given behavior.
Returns: Returns:
A future.Outcome describing whether the given behavior returned a value or An Outcome describing whether the given behavior returned a value or raised
raised an exception. an exception.
""" """
return _call_logging_exceptions(behavior, message, *args, **kwargs) return _call_logging_exceptions(behavior, message, *args, **kwargs)

@ -27,146 +27,210 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The Future interface missing from Python's standard library. """A Future interface.
Python's concurrent.futures library defines a Future class very much like the Python doesn't have a Future interface in its standard library. In the absence
Future defined here, but since that class is concrete and without construction of such a standard, three separate, incompatible implementations
semantics it is only available within the concurrent.futures library itself. (concurrent.futures.Future, ndb.Future, and asyncio.Future) have appeared. This
The Future class defined here is an entirely abstract interface that anyone may interface attempts to be as compatible as possible with
concurrent.futures.Future. From ndb.Future it adopts a traceback-object accessor
method.
Unlike the concrete and implemented Future classes listed above, the Future
class defined in this module is an entirely abstract interface that anyone may
implement and use. implement and use.
The one known incompatibility between this interface and the interface of
concurrent.futures.Future is that this interface defines its own CancelledError
and TimeoutError exceptions rather than raising the implementation-private
concurrent.futures._base.CancelledError and the
built-in-but-only-in-3.3-and-later TimeoutError.
""" """
import abc import abc
import collections
RETURNED = object()
RAISED = object()
ABORTED = object()
class Outcome(object):
"""A sum type describing the outcome of some computation.
Attributes:
category: One of RETURNED, RAISED, or ABORTED, respectively indicating
that the computation returned a value, raised an exception, or was
aborted.
return_value: The value returned by the computation. Must be present if
category is RETURNED.
exception: The exception raised by the computation. Must be present if
category is RAISED.
"""
__metaclass__ = abc.ABCMeta
class TimeoutError(Exception):
"""Indicates that a particular call timed out."""
class _EasyOutcome(
collections.namedtuple('_EasyOutcome',
['category', 'return_value', 'exception']),
Outcome):
"""A trivial implementation of Outcome."""
# All Outcomes describing abortion are indistinguishable so there might as well class CancelledError(Exception):
# be only one. """Indicates that the computation underlying a Future was cancelled."""
_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None)
def aborted(): class Future(object):
"""Returns an Outcome indicating that a computation was aborted. """A representation of a computation in another control flow.
Returns: Computations represented by a Future may be yet to be begun, may be ongoing,
An Outcome indicating that a computation was aborted. or may have already completed.
""" """
return _ABORTED_OUTCOME __metaclass__ = abc.ABCMeta
def raised(exception): # NOTE(nathaniel): This isn't the return type that I would want to have if it
"""Returns an Outcome indicating that a computation raised an exception. # were up to me. Were this interface being written from scratch, the return
# type of this method would probably be a sum type like:
#
# NOT_COMMENCED
# COMMENCED_AND_NOT_COMPLETED
# PARTIAL_RESULT<Partial_Result_Type>
# COMPLETED<Result_Type>
# UNCANCELLABLE
# NOT_IMMEDIATELY_DETERMINABLE
@abc.abstractmethod
def cancel(self):
"""Attempts to cancel the computation.
Args: This method does not block.
exception: The exception raised by the computation.
Returns: Returns:
An Outcome indicating that a computation raised the given exception. True if the computation has not yet begun, will not be allowed to take
place, and determination of both was possible without blocking. False
under all other circumstances including but not limited to the
computation's already having begun, the computation's already having
finished, and the computation's having been scheduled for execution on a
remote system for which a determination of whether or not it commenced
before being cancelled cannot be made without blocking.
""" """
return _EasyOutcome(RAISED, None, exception) raise NotImplementedError()
def returned(value): # NOTE(nathaniel): Here too this isn't the return type that I'd want this
"""Returns an Outcome indicating that a computation returned a value. # method to have if it were up to me. I think I'd go with another sum type
# like:
#
# NOT_CANCELLED (this object's cancel method hasn't been called)
# NOT_COMMENCED
# COMMENCED_AND_NOT_COMPLETED
# PARTIAL_RESULT<Partial_Result_Type>
# COMPLETED<Result_Type>
# UNCANCELLABLE
# NOT_IMMEDIATELY_DETERMINABLE
#
# Notice how giving the cancel method the right semantics obviates most
# reasons for this method to exist.
@abc.abstractmethod
def cancelled(self):
"""Describes whether the computation was cancelled.
Args: This method does not block.
value: The value returned by the computation.
Returns: Returns:
An Outcome indicating that a computation returned the given value. True if the computation was cancelled any time before its result became
immediately available. False under all other circumstances including but
not limited to this object's cancel method not having been called and
the computation's result having become immediately available.
""" """
return _EasyOutcome(RETURNED, value, None) raise NotImplementedError()
class Future(object): @abc.abstractmethod
"""A representation of a computation happening in another control flow. def running(self):
"""Describes whether the computation is taking place.
Computations represented by a Future may have already completed, may be This method does not block.
ongoing, or may be yet to be begun.
Computations represented by a Future are considered uninterruptable; once Returns:
started they will be allowed to terminate either by returning or raising True if the computation is scheduled to take place in the future or is
an exception. taking place now, or False if the computation took place in the past or
was cancelled.
""" """
__metaclass__ = abc.ABCMeta raise NotImplementedError()
# NOTE(nathaniel): These aren't quite the semantics I'd like here either. I
# would rather this only returned True in cases in which the underlying
# computation completed successfully. A computation's having been cancelled
# conflicts with considering that computation "done".
@abc.abstractmethod @abc.abstractmethod
def cancel(self): def done(self):
"""Attempts to cancel the computation. """Describes whether the computation has taken place.
This method does not block.
Returns: Returns:
True if the computation will not be allowed to take place or False if True if the computation is known to have either completed or have been
the computation has already taken place or is currently taking place. unscheduled or interrupted. False if the computation may possibly be
executing or scheduled to execute later.
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def cancelled(self): def result(self, timeout=None):
"""Describes whether the computation was cancelled. """Accesses the outcome of the computation or raises its exception.
This method may return immediately or may block.
Args:
timeout: The length of time in seconds to wait for the computation to
finish or be cancelled, or None if this method should block until the
computation has finished or is cancelled no matter how long that takes.
Returns: Returns:
True if the computation was cancelled and did not take place or False The return value of the computation.
if the computation took place, is taking place, or is scheduled to
take place in the future. Raises:
TimeoutError: If a timeout value is passed and the computation does not
terminate within the allotted time.
CancelledError: If the computation was cancelled.
Exception: If the computation raised an exception, this call will raise
the same exception.
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def done(self): def exception(self, timeout=None):
"""Describes whether the computation has taken place. """Return the exception raised by the computation.
This method may return immediately or may block.
Args:
timeout: The length of time in seconds to wait for the computation to
terminate or be cancelled, or None if this method should block until
the computation is terminated or is cancelled no matter how long that
takes.
Returns: Returns:
True if the computation took place; False otherwise. The exception raised by the computation, or None if the computation did
not raise an exception.
Raises:
TimeoutError: If a timeout value is passed and the computation does not
terminate within the allotted time.
CancelledError: If the computation was cancelled.
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def outcome(self): def traceback(self, timeout=None):
"""Accesses the outcome of the computation. """Access the traceback of the exception raised by the computation.
If the computation has not yet completed, this method blocks until it has. This method may return immediately or may block.
Args:
timeout: The length of time in seconds to wait for the computation to
terminate or be cancelled, or None if this method should block until
the computation is terminated or is cancelled no matter how long that
takes.
Returns: Returns:
An Outcome describing the outcome of the computation. The traceback of the exception raised by the computation, or None if the
computation did not raise an exception.
Raises:
TimeoutError: If a timeout value is passed and the computation does not
terminate within the allotted time.
CancelledError: If the computation was cancelled.
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def add_done_callback(self, callback): def add_done_callback(self, fn):
"""Adds a function to be called at completion of the computation. """Adds a function to be called at completion of the computation.
The callback will be passed an Outcome object describing the outcome of The callback will be passed this Future object describing the outcome of
the computation. the computation.
If the computation has already completed, the callback will be called If the computation has already completed, the callback will be called
immediately. immediately.
Args: Args:
callback: A callable taking an Outcome as its single parameter. fn: A callable taking a this Future object as its single parameter.
""" """
raise NotImplementedError() raise NotImplementedError()

Loading…
Cancel
Save