Queue

class Queue(service_name, name, context)[source]

Bases: PartitionSpecificProxy[BlockingQueue], Generic[ItemType]

Concurrent, blocking, distributed, observable queue.

Queue is not a partitioned data-structure. All of the Queue content is stored in a single machine (and in the backup). Queue will not scale by adding more members in the cluster.

add(item: ItemType) Future[bool][source]

Adds the specified item to this queue if there is available space.

Parameters:

item – The specified item.

Returns:

True if element is successfully added, False otherwise.

add_all(items: Sequence[ItemType]) Future[bool][source]

Adds the elements in the specified collection to this queue.

Parameters:

items – Collection which includes the items to be added.

Returns:

True if this queue is changed after call, False otherwise.

add_listener(include_value: bool = False, item_added_func: Optional[Callable[[ItemEvent[ItemType]], None]] = None, item_removed_func: Optional[Callable[[ItemEvent[ItemType]], None]] = None) Future[str][source]
Adds an item listener for this queue. Listener will be notified for

all queue add/remove events.

Parameters:
  • include_value – Whether received events include the updated item or not.

  • item_added_func – Function to be called when an item is added to this set.

  • item_removed_func – Function to be called when an item is deleted from this set.

Returns:

A registration id which is used as a key to remove the listener.

clear() Future[None][source]

Clears this queue. Queue will be empty after this call.

contains(item: ItemType) Future[bool][source]

Determines whether this queue contains the specified item or not.

Parameters:

item – The specified item to be searched.

Returns:

True if the specified item exists in this queue, False otherwise.

contains_all(items: Sequence[ItemType]) Future[bool][source]

Determines whether this queue contains all of the items in the specified collection or not.

Parameters:

items – The specified collection which includes the items to be searched.

Returns:

True if all of the items in the specified collection exist in this queue, False otherwise.

drain_to(target_list: List[ItemType], max_size: int = -1) Future[int][source]

Transfers all available items to the given target_list and removes these items from this queue.

If a max_size is specified, it transfers at most the given number of items. In case of a failure, an item can exist in both collections or none of them.

This operation may be more efficient than polling elements repeatedly and putting into collection.

Parameters:
  • target_list – the list where the items in this queue will be transferred.

  • max_size – The maximum number items to transfer.

Returns:

Number of transferred items.

iterator() Future[List[ItemType]][source]

Returns all the items in this queue.

Returns:

Collection of items in this queue.

is_empty() Future[bool][source]

Determines whether this set is empty or not.

Returns:

True if this queue is empty, False otherwise.

offer(item: ItemType, timeout: float = 0) Future[bool][source]

Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions.

If there is no space currently available:

  • If the timeout is provided, it waits until this timeout elapses and returns the result.

  • If the timeout is not provided, returns False immediately.

Parameters:
  • item – The item to be added.

  • timeout – Maximum time in seconds to wait for addition.

Returns:

True if the element was added to this queue, False otherwise.

peek() Future[Optional[ItemType]][source]

Retrieves the head of queue without removing it from the queue.

Returns:

The head of this queue, or None if this queue is empty.

poll(timeout: float = 0) Future[Optional[ItemType]][source]

Retrieves and removes the head of this queue.

If this queue is empty:

  • If the timeout is provided, it waits until this timeout elapses and returns the result.

  • If the timeout is not provided, returns None.

Parameters:

timeout – Maximum time in seconds to wait for addition.

Returns:

The head of this queue, or None if this queue is empty or specified timeout elapses before an item is added to the queue.

put(item: ItemType) Future[None][source]

Adds the specified element into this queue.

If there is no space, it waits until necessary space becomes available.

Parameters:

item – The specified item.

remaining_capacity() Future[int][source]

Returns the remaining capacity of this queue.

Returns:

Remaining capacity of this queue.

remove(item: ItemType) Future[bool][source]

Removes the specified element from the queue if it exists.

Parameters:

item – The specified element to be removed.

Returns:

True if the specified element exists in this queue, False otherwise.

remove_all(items: Sequence[ItemType]) Future[bool][source]

Removes all of the elements of the specified collection from this queue.

Parameters:

items – The specified collection.

Returns:

True if the call changed this queue, False otherwise.

remove_listener(registration_id: str) Future[bool][source]

Removes the specified item listener.

Returns silently if the specified listener was not added before.

Parameters:

registration_id – Id of the listener to be deleted.

Returns:

True if the item listener is removed, False otherwise.

retain_all(items: Sequence[ItemType]) Future[bool][source]

Removes the items which are not contained in the specified collection.

In other words, only the items that are contained in the specified collection will be retained.

Parameters:

items – Collection which includes the elements to be retained in this set.

Returns:

True if this queue changed as a result of the call, False otherwise.

size() Future[int][source]

Returns the number of elements in this collection.

If the size is greater than 2**31 - 1, it returns 2**31 - 1.

Returns:

Size of the queue.

take() Future[ItemType][source]

Retrieves and removes the head of this queue, if necessary, waits until an item becomes available.

Returns:

The head of this queue.

blocking() BlockingQueue[ItemType][source]

Returns a version of this proxy with only blocking method calls.

class BlockingQueue(wrapped: Queue[ItemType])[source]

Bases: Queue[ItemType]

name
service_name
add(item: ItemType) bool[source]

Adds the specified item to this queue if there is available space.

Parameters:

item – The specified item.

Returns:

True if element is successfully added, False otherwise.

add_all(items: Sequence[ItemType]) bool[source]

Adds the elements in the specified collection to this queue.

Parameters:

items – Collection which includes the items to be added.

Returns:

True if this queue is changed after call, False otherwise.

add_listener(include_value: bool = False, item_added_func: Optional[Callable[[ItemEvent[ItemType]], None]] = None, item_removed_func: Optional[Callable[[ItemEvent[ItemType]], None]] = None) str[source]
Adds an item listener for this queue. Listener will be notified for

all queue add/remove events.

Parameters:
  • include_value – Whether received events include the updated item or not.

  • item_added_func – Function to be called when an item is added to this set.

  • item_removed_func – Function to be called when an item is deleted from this set.

Returns:

A registration id which is used as a key to remove the listener.

clear() None[source]

Clears this queue. Queue will be empty after this call.

contains(item: ItemType) bool[source]

Determines whether this queue contains the specified item or not.

Parameters:

item – The specified item to be searched.

Returns:

True if the specified item exists in this queue, False otherwise.

contains_all(items: Sequence[ItemType]) bool[source]

Determines whether this queue contains all of the items in the specified collection or not.

Parameters:

items – The specified collection which includes the items to be searched.

Returns:

True if all of the items in the specified collection exist in this queue, False otherwise.

drain_to(target_list: List[ItemType], max_size: int = -1) int[source]

Transfers all available items to the given target_list and removes these items from this queue.

If a max_size is specified, it transfers at most the given number of items. In case of a failure, an item can exist in both collections or none of them.

This operation may be more efficient than polling elements repeatedly and putting into collection.

Parameters:
  • target_list – the list where the items in this queue will be transferred.

  • max_size – The maximum number items to transfer.

Returns:

Number of transferred items.

iterator() List[ItemType][source]

Returns all the items in this queue.

Returns:

Collection of items in this queue.

is_empty() bool[source]

Determines whether this set is empty or not.

Returns:

True if this queue is empty, False otherwise.

offer(item: ItemType, timeout: float = 0) bool[source]

Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions.

If there is no space currently available:

  • If the timeout is provided, it waits until this timeout elapses and returns the result.

  • If the timeout is not provided, returns False immediately.

Parameters:
  • item – The item to be added.

  • timeout – Maximum time in seconds to wait for addition.

Returns:

True if the element was added to this queue, False otherwise.

peek() Optional[ItemType][source]

Retrieves the head of queue without removing it from the queue.

Returns:

The head of this queue, or None if this queue is empty.

poll(timeout: float = 0) Optional[ItemType][source]

Retrieves and removes the head of this queue.

If this queue is empty:

  • If the timeout is provided, it waits until this timeout elapses and returns the result.

  • If the timeout is not provided, returns None.

Parameters:

timeout – Maximum time in seconds to wait for addition.

Returns:

The head of this queue, or None if this queue is empty or specified timeout elapses before an item is added to the queue.

put(item: ItemType) None[source]

Adds the specified element into this queue.

If there is no space, it waits until necessary space becomes available.

Parameters:

item – The specified item.

remaining_capacity() int[source]

Returns the remaining capacity of this queue.

Returns:

Remaining capacity of this queue.

remove(item: ItemType) bool[source]

Removes the specified element from the queue if it exists.

Parameters:

item – The specified element to be removed.

Returns:

True if the specified element exists in this queue, False otherwise.

remove_all(items: Sequence[ItemType]) bool[source]

Removes all of the elements of the specified collection from this queue.

Parameters:

items – The specified collection.

Returns:

True if the call changed this queue, False otherwise.

remove_listener(registration_id: str) bool[source]

Removes the specified item listener.

Returns silently if the specified listener was not added before.

Parameters:

registration_id – Id of the listener to be deleted.

Returns:

True if the item listener is removed, False otherwise.

retain_all(items: Sequence[ItemType]) bool[source]

Removes the items which are not contained in the specified collection.

In other words, only the items that are contained in the specified collection will be retained.

Parameters:

items – Collection which includes the elements to be retained in this set.

Returns:

True if this queue changed as a result of the call, False otherwise.

size() int[source]

Returns the number of elements in this collection.

If the size is greater than 2**31 - 1, it returns 2**31 - 1.

Returns:

Size of the queue.

take() ItemType[source]

Retrieves and removes the head of this queue, if necessary, waits until an item becomes available.

Returns:

The head of this queue.

destroy() bool[source]

Destroys this proxy.

Returns:

True if this proxy is destroyed successfully, False otherwise.

blocking() BlockingQueue[ItemType][source]

Returns a version of this proxy with only blocking method calls.