import typing
from hazelcast.errors import IllegalStateError
from hazelcast.protocol.codec import (
queue_add_all_codec,
queue_add_listener_codec,
queue_clear_codec,
queue_compare_and_remove_all_codec,
queue_compare_and_retain_all_codec,
queue_contains_all_codec,
queue_contains_codec,
queue_drain_to_max_size_codec,
queue_is_empty_codec,
queue_iterator_codec,
queue_offer_codec,
queue_peek_codec,
queue_poll_codec,
queue_put_codec,
queue_remaining_capacity_codec,
queue_remove_codec,
queue_remove_listener_codec,
queue_size_codec,
queue_take_codec,
)
from hazelcast.internal.asyncio_proxy.base import (
PartitionSpecificProxy,
ItemEvent,
ItemEventType,
)
from hazelcast.types import ItemType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, to_millis, deserialize_list_in_place
[docs]
class Queue(PartitionSpecificProxy, typing.Generic[ItemType]):
"""Concurrent, blocking, distributed, observable queue.
Queue is not a partitioned data-structure. All of the Queue content is
stored in a single machine (and in the backup). Queue will not scale by
adding more members in the cluster.
Example:
>>> my_queue = await client.get_queue("my_queue")
>>> print("queue.offer", await my_queue.offer("item"))
>>> print("queue.size", await my_queue.size())
Warning:
Asyncio client queue proxy is not thread-safe, do not access it from other threads.
"""
[docs]
async def add(self, item: ItemType) -> bool:
"""Adds the specified item to this queue if there is available space.
Args:
item: The specified item.
Returns:
``True`` if element is successfully added, ``False`` otherwise.
Raises:
IllegalStateError: If queue is full.
"""
if not await self.offer(item):
raise IllegalStateError("Queue is full!")
return True
[docs]
async def add_all(self, items: typing.Sequence[ItemType]) -> bool:
"""Adds the elements in the specified collection to this queue.
Args:
items: Collection which includes the items to be added.
Returns:
``True`` if this queue is changed after call, ``False`` otherwise.
"""
check_not_none(items, "Value can't be None")
try:
data_items = []
for item in items:
check_not_none(item, "Value can't be None")
data_items.append(self._to_data(item))
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.add_all, items)
request = queue_add_all_codec.encode_request(self.name, data_items)
return await self._invoke(request, queue_add_all_codec.decode_response)
[docs]
async def add_listener(
self,
include_value: bool = False,
item_added_func: typing.Callable[[ItemEvent[ItemType]], None] = None,
item_removed_func: typing.Callable[[ItemEvent[ItemType]], None] = None,
) -> str:
"""Adds an item listener for this queue. Listener will be notified for
all queue add/remove events.
Args:
include_value: Whether received events include the updated item or
not.
item_added_func: Function to be called when an item is added to
this queue.
item_removed_func: Function to be called when an item is deleted
from this queue.
Returns:
A registration id which is used as a key to remove the listener.
"""
codec = queue_add_listener_codec
request = codec.encode_request(self.name, include_value, self._is_smart)
def handle_event_item(item_data, uuid, event_type):
item = self._to_object(item_data) if include_value else None
member = self._context.cluster_service.get_member(uuid)
item_event = ItemEvent(self.name, item, event_type, member)
if event_type == ItemEventType.ADDED:
if item_added_func:
item_added_func(item_event)
else:
if item_removed_func:
item_removed_func(item_event)
return await self._register_listener(
request,
lambda r: codec.decode_response(r),
lambda reg_id: queue_remove_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, handle_event_item),
)
[docs]
async def clear(self) -> None:
"""Clears this queue. Queue will be empty after this call."""
request = queue_clear_codec.encode_request(self.name)
return await self._invoke(request)
[docs]
async def contains(self, item: ItemType) -> bool:
"""Determines whether this queue contains the specified item or not.
Args:
item: The specified item to be searched.
Returns:
``True`` if the specified item exists in this queue, ``False``
otherwise.
"""
check_not_none(item, "Item can't be None")
try:
item_data = self._to_data(item)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.contains, item)
request = queue_contains_codec.encode_request(self.name, item_data)
return await self._invoke(request, queue_contains_codec.decode_response)
[docs]
async def contains_all(self, items: typing.Sequence[ItemType]) -> bool:
"""Determines whether this queue contains all of the items in the
specified collection or not.
Args:
items: The specified collection which includes the items to be
searched.
Returns:
``True`` if all of the items in the specified collection exist in
this queue, ``False`` otherwise.
"""
check_not_none(items, "Items can't be None")
try:
data_items = []
for item in items:
check_not_none(item, "item can't be None")
data_items.append(self._to_data(item))
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.contains_all, items)
request = queue_contains_all_codec.encode_request(self.name, data_items)
return await self._invoke(request, queue_contains_all_codec.decode_response)
[docs]
async def drain_to(self, target_list: typing.List[ItemType], max_size: int = -1) -> int:
"""Transfers all available items to the given `target_list` and removes
these items from this queue.
If a max_size is specified, it transfers at most the given number of
items. In case of a failure, an item can exist in both collections or
none of them.
This operation may be more efficient than polling elements repeatedly
and putting into collection.
Args:
target_list: the list where the items in this queue will be
transferred.
max_size: The maximum number items to transfer.
Returns:
Number of transferred items.
"""
def handler(message):
response = queue_drain_to_max_size_codec.decode_response(message)
items = [self._to_object(item) for item in response]
target_list.extend(items)
return len(response)
request = queue_drain_to_max_size_codec.encode_request(self.name, max_size)
return await self._invoke(request, handler)
[docs]
async def iterator(self) -> typing.List[ItemType]:
"""Returns all the items in this queue.
Returns:
Collection of items in this queue.
"""
def handler(message):
data_list = queue_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)
request = queue_iterator_codec.encode_request(self.name)
return await self._invoke(request, handler)
[docs]
async def is_empty(self) -> bool:
"""Determines whether this queue is empty or not.
Returns:
``True`` if this queue is empty, ``False`` otherwise.
"""
request = queue_is_empty_codec.encode_request(self.name)
return await self._invoke(request, queue_is_empty_codec.decode_response)
[docs]
async def offer(self, item: ItemType, timeout: float = 0) -> bool:
"""Inserts the specified element into this queue if it is possible to
do so immediately without violating capacity restrictions.
If there is no space currently available:
- If the timeout is provided, it waits until this timeout elapses
and returns the result.
- If the timeout is not provided, returns ``False`` immediately.
Args:
item: The item to be added.
timeout: Maximum time in seconds to wait for addition.
Returns:
``True`` if the element was added to this queue, ``False``
otherwise.
"""
check_not_none(item, "Value can't be None")
try:
element_data = self._to_data(item)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.offer, item, timeout)
request = queue_offer_codec.encode_request(self.name, element_data, to_millis(timeout))
return await self._invoke(request, queue_offer_codec.decode_response)
[docs]
async def peek(self) -> typing.Optional[ItemType]:
"""Retrieves the head of queue without removing it from the queue.
Returns:
The head of this queue, or ``None`` if this queue is empty.
"""
def handler(message):
return self._to_object(queue_peek_codec.decode_response(message))
request = queue_peek_codec.encode_request(self.name)
return await self._invoke(request, handler)
[docs]
async def poll(self, timeout: float = 0) -> typing.Optional[ItemType]:
"""Retrieves and removes the head of this queue.
If this queue is empty:
- If the timeout is provided, it waits until this timeout elapses
and returns the result.
- If the timeout is not provided, returns ``None``.
Args:
timeout: Maximum time in seconds to wait for addition.
Returns:
The head of this queue, or ``None`` if this queue is empty or
specified timeout elapses before an item is added to the queue.
"""
def handler(message):
return self._to_object(queue_poll_codec.decode_response(message))
request = queue_poll_codec.encode_request(self.name, to_millis(timeout))
return await self._invoke(request, handler)
[docs]
async def put(self, item: ItemType) -> None:
"""Adds the specified element into this queue.
If there is no space, it waits until necessary space becomes available.
Args:
item: The specified item.
"""
check_not_none(item, "Value can't be None")
try:
element_data = self._to_data(item)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.put, item)
request = queue_put_codec.encode_request(self.name, element_data)
return await self._invoke(request)
[docs]
async def remaining_capacity(self) -> int:
"""Returns the remaining capacity of this queue.
Returns:
Remaining capacity of this queue.
"""
request = queue_remaining_capacity_codec.encode_request(self.name)
return await self._invoke(request, queue_remaining_capacity_codec.decode_response)
[docs]
async def remove(self, item: ItemType) -> bool:
"""Removes the specified element from the queue if it exists.
Args:
item: The specified element to be removed.
Returns:
``True`` if the specified element exists in this queue, ``False``
otherwise.
"""
check_not_none(item, "Value can't be None")
try:
item_data = self._to_data(item)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.remove, item)
request = queue_remove_codec.encode_request(self.name, item_data)
return await self._invoke(request, queue_remove_codec.decode_response)
[docs]
async def remove_all(self, items: typing.Sequence[ItemType]) -> bool:
"""Removes all of the elements of the specified collection from this
queue.
Args:
items: The specified collection.
Returns:
``True`` if the call changed this queue, ``False`` otherwise.
"""
check_not_none(items, "Value can't be None")
try:
data_items = []
for item in items:
check_not_none(item, "Value can't be None")
data_items.append(self._to_data(item))
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.remove_all, items)
request = queue_compare_and_remove_all_codec.encode_request(self.name, data_items)
return await self._invoke(request, queue_compare_and_remove_all_codec.decode_response)
[docs]
async def remove_listener(self, registration_id: str) -> bool:
"""Removes the specified item listener.
Returns silently if the specified listener was not added before.
Args:
registration_id: Id of the listener to be deleted.
Returns:
``True`` if the item listener is removed, ``False`` otherwise.
"""
return await self._deregister_listener(registration_id)
[docs]
async def retain_all(self, items: typing.Sequence[ItemType]) -> bool:
"""Removes the items which are not contained in the specified
collection.
In other words, only the items that are contained in the specified
collection will be retained.
Args:
items: Collection which includes the elements to be retained in
this queue.
Returns:
``True`` if this queue changed as a result of the call, ``False``
otherwise.
"""
check_not_none(items, "Value can't be None")
try:
data_items = []
for item in items:
check_not_none(item, "Value can't be None")
data_items.append(self._to_data(item))
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.retain_all, items)
request = queue_compare_and_retain_all_codec.encode_request(self.name, data_items)
return await self._invoke(request, queue_compare_and_retain_all_codec.decode_response)
[docs]
async def size(self) -> int:
"""Returns the number of elements in this collection.
If the size is greater than ``2**31 - 1``, it returns ``2**31 - 1``.
Returns:
Size of the queue.
"""
request = queue_size_codec.encode_request(self.name)
return await self._invoke(request, queue_size_codec.decode_response)
[docs]
async def take(self) -> ItemType:
"""Retrieves and removes the head of this queue, if necessary, waits
until an item becomes available.
Returns:
The head of this queue.
"""
def handler(message):
return self._to_object(queue_take_codec.decode_response(message))
request = queue_take_codec.encode_request(self.name)
return await self._invoke(request, handler)
async def create_queue_proxy(service_name, name, context):
return Queue(service_name, name, context)