import logging
import sys
import threading
from hazelcast.util import AtomicInteger
from hazelcast import six
_logger = logging.getLogger(__name__)
NONE_RESULT = object()
[docs]class Future(object):
"""Future is used for representing an asynchronous computation result."""
_result = None
_exception = None
_traceback = None
_threading_locals = threading.local()
def __init__(self):
self._callbacks = []
self._event = _Event()
[docs] def set_result(self, result):
"""Sets the result of the Future.
Args:
result: Result of the Future.
"""
if result is None:
self._result = NONE_RESULT
else:
self._result = result
self._event.set()
self._invoke_callbacks()
[docs] def set_exception(self, exception, traceback=None):
"""Sets the exception for this Future in case of errors.
Args:
exception (Exception): Exception to be threw in case of error.
traceback (function): Function to be called on traceback.
"""
if not isinstance(exception, BaseException):
raise RuntimeError("Exception must be of BaseException type")
self._exception = exception
self._traceback = traceback
self._event.set()
self._invoke_callbacks()
[docs] def result(self):
"""Returns the result of the Future, which makes the call synchronous if the result has not been computed yet.
Returns:
Result of the Future.
"""
self._reactor_check()
self._event.wait()
if self._exception:
six.reraise(self._exception.__class__, self._exception, self._traceback)
if self._result == NONE_RESULT:
return None
else:
return self._result
def _reactor_check(self):
if not self.done() and hasattr(self._threading_locals, "is_reactor_thread"):
raise RuntimeError(
"Synchronous result for incomplete operation must not be called from Reactor thread. "
"Use add_done_callback instead."
)
[docs] def is_success(self):
"""Determines whether the result can be successfully computed or not."""
return self._result is not None
[docs] def done(self):
"""Determines whether the result is computed or not.
Returns:
bool: ``True`` if the result is computed, ``False`` otherwise.
"""
return self._event.is_set()
[docs] def running(self):
"""Determines whether the asynchronous call, the computation is still running or not.
Returns:
bool: ``True`` if the result is being computed, ``False`` otherwise.
"""
return not self.done()
[docs] def exception(self):
"""Returns the exceptional result, if any.
Returns:
Exception: Exception of this Future.
"""
self._reactor_check()
self._event.wait()
return self._exception
[docs] def traceback(self):
"""Traceback function for the Future."""
self._reactor_check()
self._event.wait()
return self._traceback
[docs] def add_done_callback(self, callback):
run_callback = False
with self._event.condition:
if self.done():
run_callback = True
else:
self._callbacks.append(callback)
if run_callback:
self._invoke_cb(callback)
def _invoke_callbacks(self):
for callback in self._callbacks:
self._invoke_cb(callback)
def _invoke_cb(self, callback):
try:
callback(self)
except:
_logger.exception("Exception when invoking callback")
[docs] def continue_with(self, continuation_func, *args):
"""Create a continuation that executes when the Future is completed.
Args:
continuation_func (function): A function which takes the Future as the only parameter.
Return value of the function will be set as the result of the continuation future.
If the return value of the function is another Future, it will be chained
to the returned Future.
*args: Arguments to be passed into ``continuation_function``.
Returns:
Future: A new Future which will be completed when the continuation is done.
"""
future = Future()
def callback(f):
try:
result = continuation_func(f, *args)
if isinstance(result, Future):
future._chain(result)
else:
future.set_result(result)
except:
exception, traceback = sys.exc_info()[1:]
future.set_exception(exception, traceback)
self.add_done_callback(callback)
return future
def _chain(self, chained_future):
def callback(f):
try:
result = f.result()
if isinstance(result, Future):
self._chain(result)
else:
self.set_result(result)
except:
exception, traceback = sys.exc_info()[1:]
self.set_exception(exception, traceback)
chained_future.add_done_callback(callback)
class _Event(object):
_flag = False
def __init__(self):
self.condition = threading.Condition(threading.Lock())
def set(self):
with self.condition:
self._flag = True
self.condition.notify_all()
def is_set(self):
return self._flag
def wait(self):
with self.condition:
if not self._flag:
self.condition.wait()
return self._flag
class ImmediateFuture(Future):
def __init__(self, result):
self._result = result
def set_exception(self, exception):
raise NotImplementedError()
def set_result(self, result):
raise NotImplementedError()
def done(self):
return True
def is_success(self):
return True
def exception(self):
return None
def traceback(self):
return None
def result(self):
return self._result
def add_done_callback(self, callback):
self._invoke_cb(callback)
class ImmediateExceptionFuture(Future):
def __init__(self, exception, traceback=None):
self._exception = exception
self._traceback = traceback
def set_exception(self, exception, traceback=None):
raise NotImplementedError()
def set_result(self, result):
raise NotImplementedError()
def done(self):
return True
def is_success(self):
return False
def exception(self):
return self._exception
def traceback(self):
return self._traceback
def result(self):
six.reraise(self._exception.__class__, self._exception, self._traceback)
def add_done_callback(self, callback):
self._invoke_cb(callback)
[docs]def combine_futures(futures):
"""Combines set of Futures.
It waits for the completion of the all input Futures regardless
of their output.
The returned Future completes with the list of the results of the input
Futures, respecting the input order.
If one of the input Futures completes exceptionally, the returned
Future also completes exceptionally. In case of multiple exceptional
completions, the returned Future will be completed with the first
exceptional result.
Args:
futures (list[Future]): List of Futures to be combined.
Returns:
Future: Result of the combination.
"""
count = len(futures)
results = [None] * count
if count == 0:
return ImmediateFuture(results)
completed = AtomicInteger()
combined = Future()
errors = []
def done(future, index):
if future.is_success():
results[index] = future.result()
else:
if not errors:
# We are fine with this check-then-act.
# At most, we will end up with couple of
# errors stored in case of the concurrent calls.
# The idea behind this check is to try to minimize
# the number of errors we store without
# synchronization, as we only need the first error.
errors.append((future.exception(), future.traceback()))
if count == completed.increment_and_get():
if errors:
first_exception, first_traceback = errors[0]
combined.set_exception(first_exception, first_traceback)
else:
combined.set_result(results)
for index, future in enumerate(futures):
# Capture the index in the closure or else we
# will only update the last element.
future.add_done_callback(lambda f, captured_index=index: done(f, captured_index))
return combined
class _BlockingWrapper(object):
def __init__(self, wrapped):
self._wrapped = wrapped
def __getattr__(self, item):
inner = getattr(self._wrapped, item)
if callable(inner):
return self.wrap(inner)
return inner
def wrap(self, inner):
def f(*args, **kwargs):
result = inner(*args, **kwargs)
if isinstance(result, Future):
return result.result()
return result
return f
def __repr__(self):
return self._wrapped.__repr__()
def make_blocking(instance):
"""Takes an instance and returns an object whose methods which return non-blocking Future become blocking calls.
Args:
instance: A non-blocking instance.
Returns:
Blocking version of given non-blocking instance.
"""
return _BlockingWrapper(instance)