RingBuffer

OVERFLOW_POLICY_OVERWRITE = 0

Configuration property for DEFAULT overflow policy. When an item is tried to be added on full Ringbuffer, oldest item in the Ringbuffer is overwritten and item is added.

OVERFLOW_POLICY_FAIL = 1

Configuration property for overflow policy. When an item is tried to be added on full Ringbuffer, the call fails and item is not added.

The reason that FAIL exist is to give the opportunity to obey the ttl. If blocking behavior is required, this can be implemented using retrying in combination with an exponential backoff.

>>> sleepMS = 100;
>>> while true:
>>>     result = ringbuffer.add(item, -1)
>>>     if result != -1:
>>>         break
>>>     sleep(sleepMS / 1000)
>>>     sleepMS *= 2
MAX_BATCH_SIZE = 1000

The maximum number of items to be added to RingBuffer or read from RingBuffer at a time.

class ReadResult(read_count, next_seq, item_seqs, items)[source]

Bases: Sequence

Defines the result of a Ringbuffer.read_many() operation.

SEQUENCE_UNAVAILABLE = -1

Value returned from methods returning a sequence number when the information is not available (e.g. because of rolling upgrade and some members not returning the sequence).

property read_count: int

The number of items that have been read before filtering.

If no filter is set, then the read_count will be equal to size.

But if a filter is applied, it could be that items are read, but are filtered out. So, if you are trying to make another read based on this, then you should increment the sequence by read_count and not by size.

Otherwise, you will be re-reading the same filtered messages.

property size: int

The result set size.

See also

read_count

property next_sequence_to_read_from: int

The sequence of the item following the last read item.

This sequence can then be used to read items following the ones returned by this result set.

Usually this sequence is equal to the sequence used to retrieve this result set incremented by the read_count. In cases when the reader tolerates lost items, this is not the case.

For instance, if the reader requests an item with a stale sequence (one which has already been overwritten), the read will jump to the oldest sequence and read from there.

Similarly, if the reader requests an item in the future (e.g. because the partition was lost and the reader was unaware of this), the read method will jump back to the newest available sequence.

Because of these jumps and only in the case when the reader is loss tolerant, the next sequence must be retrieved using this method. A return value of SEQUENCE_UNAVAILABLE means that the information is not available.

get_sequence(index: int) int[source]

Return the sequence number for the item at the given index.

Parameters:

index – The index.

Returns:

The sequence number for the ringbuffer item.

class Ringbuffer(service_name, name, context)[source]

Bases: PartitionSpecificProxy[BlockingRingbuffer], Generic[ItemType]

A Ringbuffer is an append-only data-structure where the content is stored in a ring like structure.

A ringbuffer has a capacity so it won’t grow beyond that capacity and endanger the stability of the system. If that capacity is exceeded, than the oldest item in the ringbuffer is overwritten. The ringbuffer has two always incrementing sequences:

  • tail_sequence(): This is the side where the youngest item is found. So the tail is the side of the ringbuffer where items are added to.

  • head_sequence(): This is the side where the oldest items are found. So the head is the side where items gets discarded.

The items in the ringbuffer can be found by a sequence that is in between (inclusive) the head and tail sequence.

If data is read from a ringbuffer with a sequence that is smaller than the head sequence, it means that the data is not available anymore and a hazelcast.errors.StaleSequenceError is thrown.

A Ringbuffer currently is a replicated, but not partitioned data structure. So all data is stored in a single partition, similarly to the hazelcast.proxy.queue.Queue implementation.

A Ringbuffer can be used in a way similar to the Queue, but one of the key differences is that a hazelcast.proxy.queue.Queue.take() is destructive, meaning that only 1 thread is able to take an item. A read_one() is not destructive, so you can have multiple threads reading the same item multiple times.

capacity() Future[int][source]

Returns the capacity of this Ringbuffer.

Returns:

The capacity of Ringbuffer.

size() Future[int][source]

Returns number of items in the Ringbuffer.

Returns:

The size of Ringbuffer.

tail_sequence() Future[int][source]

Returns the sequence of the tail.

The tail is the side of the Ringbuffer where the items are added to. The initial value of the tail is -1.

Returns:

The sequence of the tail.

head_sequence() Future[int][source]

Returns the sequence of the head.

The head is the side of the Ringbuffer where the oldest items in the Ringbuffer are found. If the Ringbuffer is empty, the head will be one more than the tail. The initial value of the head is 0 (1 more than tail).

Returns:

The sequence of the head.

remaining_capacity() Future[int][source]

Returns the remaining capacity of the Ringbuffer.

Returns:

The remaining capacity of Ringbuffer.

add(item, overflow_policy: int = 0) Future[int][source]

Adds the specified item to the tail of the Ringbuffer.

If there is no space in the Ringbuffer, the action is determined by overflow_policy.

Parameters:
  • item – The specified item to be added.

  • overflow_policy – the OverflowPolicy to be used when there is no space.

Returns:

The sequenceId of the added item, or -1 if the add failed.

add_all(items: Sequence[ItemType], overflow_policy: int = 0) Future[int][source]

Adds all of the item in the specified collection to the tail of the Ringbuffer.

This is likely to outperform multiple calls to add() due to better io utilization and a reduced number of executed operations. The items are added in the order of the Iterator of the collection.

If there is no space in the Ringbuffer, the action is determined by overflow_policy.

Parameters:
  • items – The specified collection which contains the items to be added.

  • overflow_policy – The OverflowPolicy to be used when there is no space.

Returns:

The sequenceId of the last written item, or -1 of the last write is failed.

read_one(sequence: int) Future[ItemType][source]

Reads one item from the Ringbuffer.

If the sequence is one beyond the current tail, this call blocks until an item is added. Currently it isn’t possible to control how long this call is going to block.

Parameters:

sequence – The sequence of the item to read.

Returns:

The read item.

read_many(start_sequence: int, min_count: int, max_count: int, filter: Optional[Any] = None) Future[ReadResult][source]

Reads a batch of items from the Ringbuffer.

If the number of available items after the first read item is smaller than the max_count, these items are returned. So it could be the number of items read is smaller than the max_count. If there are less items available than min_count, then this call blocks.

Warning

These blocking calls consume server memory and if there are many calls, it can be possible to see leaking memory or OutOfMemoryError s on the server.

Reading a batch of items is likely to perform better because less overhead is involved.

A filter can be provided to only select items that need to be read. If the filter is None, all items are read. If the filter is not None, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement. Note that, filtering logic must be defined on the server-side.

If the start_sequence is smaller than the smallest sequence still available in the Ringbuffer (head_sequence()), then the smallest available sequence will be used as the start sequence and the minimum/maximum number of items will be attempted to be read from there on.

If the start_sequence is bigger than the last available sequence in the Ringbuffer (tail_sequence()), then the last available sequence plus one will be used as the start sequence and the call will block until further items become available and it can read at least the minimum number of items.

Parameters:
  • start_sequence – The start sequence of the first item to read.

  • min_count – The minimum number of items to read.

  • max_count – The maximum number of items to read.

  • filter – Filter to select returned elements.

Returns:

The list of read items.

blocking() BlockingRingbuffer[ItemType][source]

Returns a version of this proxy with only blocking method calls.

class BlockingRingbuffer(wrapped: Ringbuffer[ItemType])[source]

Bases: Ringbuffer[ItemType]

name
service_name
capacity() int[source]

Returns the capacity of this Ringbuffer.

Returns:

The capacity of Ringbuffer.

size() int[source]

Returns number of items in the Ringbuffer.

Returns:

The size of Ringbuffer.

tail_sequence() int[source]

Returns the sequence of the tail.

The tail is the side of the Ringbuffer where the items are added to. The initial value of the tail is -1.

Returns:

The sequence of the tail.

head_sequence() int[source]

Returns the sequence of the head.

The head is the side of the Ringbuffer where the oldest items in the Ringbuffer are found. If the Ringbuffer is empty, the head will be one more than the tail. The initial value of the head is 0 (1 more than tail).

Returns:

The sequence of the head.

remaining_capacity() int[source]

Returns the remaining capacity of the Ringbuffer.

Returns:

The remaining capacity of Ringbuffer.

add(item, overflow_policy: int = 0) int[source]

Adds the specified item to the tail of the Ringbuffer.

If there is no space in the Ringbuffer, the action is determined by overflow_policy.

Parameters:
  • item – The specified item to be added.

  • overflow_policy – the OverflowPolicy to be used when there is no space.

Returns:

The sequenceId of the added item, or -1 if the add failed.

add_all(items: Sequence[ItemType], overflow_policy: int = 0) int[source]

Adds all of the item in the specified collection to the tail of the Ringbuffer.

This is likely to outperform multiple calls to add() due to better io utilization and a reduced number of executed operations. The items are added in the order of the Iterator of the collection.

If there is no space in the Ringbuffer, the action is determined by overflow_policy.

Parameters:
  • items – The specified collection which contains the items to be added.

  • overflow_policy – The OverflowPolicy to be used when there is no space.

Returns:

The sequenceId of the last written item, or -1 of the last write is failed.

read_one(sequence: int) ItemType[source]

Reads one item from the Ringbuffer.

If the sequence is one beyond the current tail, this call blocks until an item is added. Currently it isn’t possible to control how long this call is going to block.

Parameters:

sequence – The sequence of the item to read.

Returns:

The read item.

read_many(start_sequence: int, min_count: int, max_count: int, filter: Optional[Any] = None) ReadResult[source]

Reads a batch of items from the Ringbuffer.

If the number of available items after the first read item is smaller than the max_count, these items are returned. So it could be the number of items read is smaller than the max_count. If there are less items available than min_count, then this call blocks.

Warning

These blocking calls consume server memory and if there are many calls, it can be possible to see leaking memory or OutOfMemoryError s on the server.

Reading a batch of items is likely to perform better because less overhead is involved.

A filter can be provided to only select items that need to be read. If the filter is None, all items are read. If the filter is not None, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement. Note that, filtering logic must be defined on the server-side.

If the start_sequence is smaller than the smallest sequence still available in the Ringbuffer (head_sequence()), then the smallest available sequence will be used as the start sequence and the minimum/maximum number of items will be attempted to be read from there on.

If the start_sequence is bigger than the last available sequence in the Ringbuffer (tail_sequence()), then the last available sequence plus one will be used as the start sequence and the call will block until further items become available and it can read at least the minimum number of items.

Parameters:
  • start_sequence – The start sequence of the first item to read.

  • min_count – The minimum number of items to read.

  • max_count – The maximum number of items to read.

  • filter – Filter to select returned elements.

Returns:

The list of read items.

destroy() bool[source]

Destroys this proxy.

Returns:

True if this proxy is destroyed successfully, False otherwise.

blocking() BlockingRingbuffer[ItemType][source]

Returns a version of this proxy with only blocking method calls.