Source code for hazelcast.proxy.transactional_queue

import typing

from hazelcast.protocol.codec import (
    transactional_queue_offer_codec,
    transactional_queue_peek_codec,
    transactional_queue_poll_codec,
    transactional_queue_size_codec,
    transactional_queue_take_codec,
)
from hazelcast.proxy.base import TransactionalProxy
from hazelcast.types import ItemType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, to_millis, thread_id


[docs]class TransactionalQueue(TransactionalProxy, typing.Generic[ItemType]): """Transactional implementation of :class:`~hazelcast.proxy.queue.Queue`."""
[docs] def offer(self, item: ItemType, timeout: float = 0) -> bool: """Transactional implementation of :func:`Queue.offer(item, timeout) <hazelcast.proxy.queue.Queue.offer>` Args: item: The item to be added. timeout: Maximum time in seconds to wait for addition. Returns: ``True`` if the element was added to this queue, ``False`` otherwise. """ check_not_none(item, "item can't be none") try: item_data = self._to_data(item) except SchemaNotReplicatedError as e: self._send_schema(e) return self.offer(item, timeout) request = transactional_queue_offer_codec.encode_request( self.name, self.transaction.id, thread_id(), item_data, to_millis(timeout) ) return self._invoke(request, transactional_queue_offer_codec.decode_response)
[docs] def take(self) -> ItemType: """Transactional implementation of :func:`Queue.take() <hazelcast.proxy.queue.Queue.take>` Returns: The head of this queue. """ def handler(message): return self._to_object(transactional_queue_take_codec.decode_response(message)) request = transactional_queue_take_codec.encode_request( self.name, self.transaction.id, thread_id() ) return self._invoke(request, handler)
[docs] def poll(self, timeout: float = 0) -> typing.Optional[ItemType]: """Transactional implementation of :func:`Queue.poll(timeout) <hazelcast.proxy.queue.Queue.poll>` Args: timeout: Maximum time in seconds to wait for addition. Returns: The head of this queue, or ``None`` if this queue is empty or specified timeout elapses before an item is added to the queue. """ def handler(message): return self._to_object(transactional_queue_poll_codec.decode_response(message)) request = transactional_queue_poll_codec.encode_request( self.name, self.transaction.id, thread_id(), to_millis(timeout) ) return self._invoke(request, handler)
[docs] def peek(self, timeout: float = 0) -> typing.Optional[ItemType]: """Transactional implementation of :func:`Queue.peek(timeout) <hazelcast.proxy.queue.Queue.peek>` Args: timeout: Maximum time in seconds to wait for addition. Returns: The head of this queue, or ``None`` if this queue is empty or specified timeout elapses before an item is added to the queue. """ def handler(message): return self._to_object(transactional_queue_peek_codec.decode_response(message)) request = transactional_queue_peek_codec.encode_request( self.name, self.transaction.id, thread_id(), to_millis(timeout) ) return self._invoke(request, handler)
[docs] def size(self) -> int: """Transactional implementation of :func:`Queue.size() <hazelcast.proxy.queue.Queue.size>` Returns: Size of the queue. """ request = transactional_queue_size_codec.encode_request( self.name, self.transaction.id, thread_id() ) return self._invoke(request, transactional_queue_size_codec.decode_response)