Source code for hazelcast.future

import logging
import sys
import threading
import types

import typing

from hazelcast.util import AtomicInteger, re_raise

_logger = logging.getLogger(__name__)
_SENTINEL = object()

ResultType = typing.TypeVar("ResultType")


[docs]class Future(typing.Generic[ResultType]): """Future is used for representing an asynchronous computation result.""" _result = _SENTINEL _exception = None _traceback = None _threading_locals = threading.local() def __init__(self): self._callbacks = [] self._event = _Event()
[docs] def set_result(self, result: ResultType) -> None: """Sets the result of the Future. Args: result: Result of the Future. """ self._result = result self._event.set() self._invoke_callbacks()
[docs] def set_exception(self, exception: Exception, traceback: types.TracebackType = None) -> None: """Sets the exception for this Future in case of errors. Args: exception: Exception to raise in case of error. traceback: Traceback of the exception. """ if not isinstance(exception, BaseException): raise RuntimeError("Exception must be of BaseException type") self._exception = exception self._traceback = traceback self._event.set() self._invoke_callbacks()
[docs] def result(self) -> ResultType: """Returns the result of the Future, which makes the call synchronous if the result has not been computed yet. Returns: Result of the Future. """ self._reactor_check() self._event.wait() if self._exception: re_raise(self._exception, self._traceback) # Result will be set to the correct type before we # return from here return self._result # type: ignore[return-value]
def _reactor_check(self): if not self.done() and hasattr(self._threading_locals, "is_reactor_thread"): raise RuntimeError( "Synchronous result for incomplete operation must not be called from Reactor thread. " "Use add_done_callback instead." )
[docs] def is_success(self) -> bool: """Determines whether the result can be successfully computed or not.""" return self._result is not _SENTINEL
[docs] def done(self) -> bool: """Determines whether the result is computed or not. Returns: ``True`` if the result is computed, ``False`` otherwise. """ return self._event.is_set()
[docs] def running(self) -> bool: """Determines whether the asynchronous call, the computation is still running or not. Returns: ``True`` if the result is being computed, ``False`` otherwise. """ return not self.done()
[docs] def exception(self) -> typing.Optional[Exception]: """Returns the exceptional result, if any. Returns: Exceptional result of this Future. """ self._reactor_check() self._event.wait() return self._exception
[docs] def traceback(self) -> typing.Optional[types.TracebackType]: """Traceback of the exception.""" self._reactor_check() self._event.wait() return self._traceback
[docs] def add_done_callback(self, callback: typing.Callable[["Future"], None]) -> None: run_callback = False with self._event.condition: if self.done(): run_callback = True else: self._callbacks.append(callback) if run_callback: self._invoke_cb(callback)
def _invoke_callbacks(self): for callback in self._callbacks: self._invoke_cb(callback) def _invoke_cb(self, callback): try: callback(self) except: _logger.exception("Exception when invoking callback")
[docs] def continue_with( self, continuation_func: typing.Callable[..., typing.Any], *args: typing.Any ) -> "Future": """Create a continuation that executes when the Future is completed. Args: continuation_func: A function which takes the Future as the only parameter. Return value of the function will be set as the result of the continuation future. If the return value of the function is another Future, it will be chained to the returned Future. *args: Arguments to be passed into ``continuation_function``. Returns: A new Future which will be completed when the continuation is done. """ future: Future[typing.Any] = Future() def callback(f): try: result = continuation_func(f, *args) if isinstance(result, Future): future._chain(result) else: future.set_result(result) except: exception, traceback = sys.exc_info()[1:] future.set_exception(exception, traceback) self.add_done_callback(callback) return future
def _chain(self, chained_future): def callback(f): try: result = f.result() if isinstance(result, Future): self._chain(result) else: self.set_result(result) except: exception, traceback = sys.exc_info()[1:] self.set_exception(exception, traceback) chained_future.add_done_callback(callback)
class _Event: _flag = False def __init__(self): self.condition = threading.Condition(threading.Lock()) def set(self): with self.condition: self._flag = True self.condition.notify_all() def is_set(self): return self._flag def wait(self): with self.condition: if not self._flag: self.condition.wait() return self._flag class ImmediateFuture(Future): def __init__(self, result): self._result = result def set_exception(self, exception, traceback=None): raise NotImplementedError() def set_result(self, result): raise NotImplementedError() def done(self): return True def is_success(self): return True def exception(self): return None def traceback(self): return None def result(self): return self._result def add_done_callback(self, callback): self._invoke_cb(callback) class ImmediateExceptionFuture(Future): def __init__(self, exception, traceback=None): self._exception = exception self._traceback = traceback def set_exception(self, exception, traceback=None): raise NotImplementedError() def set_result(self, result): raise NotImplementedError() def done(self): return True def is_success(self): return False def exception(self): return self._exception def traceback(self): return self._traceback def result(self): re_raise(self._exception, self._traceback) def add_done_callback(self, callback): self._invoke_cb(callback)
[docs]def combine_futures(futures: typing.Sequence[Future]) -> Future: """Combines set of Futures. It waits for the completion of the all input Futures regardless of their output. The returned Future completes with the list of the results of the input Futures, respecting the input order. If one of the input Futures completes exceptionally, the returned Future also completes exceptionally. In case of multiple exceptional completions, the returned Future will be completed with the first exceptional result. Args: futures: List of Futures to be combined. Returns: Result of the combination. """ count = len(futures) results = [None] * count if count == 0: return ImmediateFuture(results) completed = AtomicInteger() combined: Future[typing.List[typing.Any]] = Future() errors: typing.List[typing.Tuple[Exception, types.TracebackType]] = [] def done(future, index): if future.is_success(): results[index] = future.result() else: if not errors: # We are fine with this check-then-act. # At most, we will end up with couple of # errors stored in case of the concurrent calls. # The idea behind this check is to try to minimize # the number of errors we store without # synchronization, as we only need the first error. errors.append((future.exception(), future.traceback())) if count == completed.increment_and_get(): if errors: first_exception, first_traceback = errors[0] combined.set_exception(first_exception, first_traceback) else: combined.set_result(results) def make_callback(index): return lambda f: done(f, index) for i, future in enumerate(futures): future.add_done_callback(make_callback(i)) return combined