Class FixedBuffer<T>

  • All Implemented Interfaces:
    java.io.Closeable, java.lang.AutoCloseable, java.lang.Iterable<T>

    public class FixedBuffer<T>
    extends Buffer<T>
    Ring buffer of fixed capacity. Indices low and high point to the beginning and end of the buffer. Sequence numbers (seqnos) are mapped to an index by
    seqno % capacity
    . High can never pass low, and drops the element or blocks when that's the case.
    Note that 'null' is not a valid element, but signifies a missing element
    The design is described in doc/design/FixedBuffer.txt.
    Since:
    5.4
    Author:
    Bela Ban
    • Field Detail

      • buf

        protected T[] buf
        Holds the elements
      • buffer_full

        protected final java.util.concurrent.locks.Condition buffer_full
      • open

        protected boolean open
        Used to unblock blocked senders on close(). When false, senders don't block when full but discard element
      • num_blockings

        protected final java.util.concurrent.atomic.LongAdder num_blockings
      • avg_time_blocked

        protected final AverageMinMax avg_time_blocked
      • num_dropped_msgs

        protected final java.util.concurrent.atomic.LongAdder num_dropped_msgs
        Number of received messages dropped due to full buffer
    • Constructor Detail

      • FixedBuffer

        public FixedBuffer()
      • FixedBuffer

        public FixedBuffer​(long offset)
      • FixedBuffer

        public FixedBuffer​(int capacity,
                           long offset)
        Creates a RingBuffer
        Parameters:
        capacity - The number of elements the ring buffer's array should hold.
        offset - The offset. The first element to be added has to be offset +1.
    • Method Detail

      • capacity

        public int capacity()
        Description copied from class: Buffer
        Returns the current capacity in the buffer. This value is fixed in a fixed-size buffer (e.g. FixedBuffer), but can change in a dynamic buffer (DynamicBuffer)
        Specified by:
        capacity in class Buffer<T>
      • numBlockings

        public long numBlockings()
      • numDroppedMessages

        public long numDroppedMessages()
      • add

        public boolean add​(long seqno,
                           T element,
                           java.util.function.Predicate<T> remove_filter,
                           Buffer.Options opts,
                           boolean dont_block)
        Description copied from class: Buffer
        Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.
        Specified by:
        add in class Buffer<T>
        Parameters:
        seqno - The seqno of the element
        element - The element to be added
        remove_filter - A filter used to remove all consecutive messages passing the filter (and non-null). This doesn't necessarily null a removed message, but may simply advance an index (e.g. highest delivered). Ignored if null.
        opts - The options passed to the call
        dont_block - If true, don't block when no space is available, but instead drop the element. This parameter is set by calling Message.isFlagSet(DONT_BLOCK)
        Returns:
        True if the element at the computed index was null, else false
      • add

        public boolean add​(MessageBatch batch,
                           java.util.function.Function<T,​java.lang.Long> seqno_getter,
                           boolean remove_from_batch,
                           T const_value)
        Specified by:
        add in class Buffer<T>
      • add

        public boolean add​(java.util.List<LongTuple<T>> list,
                           boolean remove_added_elements,
                           T const_value)
        Description copied from class: Buffer
        Adds elements from the list
        Specified by:
        add in class Buffer<T>
        Parameters:
        list - The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could not be added (e.g. because they were already present or the seqno was < HD), those elements will be removed from list
        remove_added_elements - If true, elements that could not be added to the table are removed from list
        const_value - If non-null, this value should be used rather than the values of the list tuples
        Returns:
        True if at least 1 element was added successfully, false otherwise.
      • remove

        public T remove​(boolean nullify)
        Removes the next non-null element and advances hd
        Specified by:
        remove in class Buffer<T>
        Returns:
        T if there was a non-null element at hd+1, otherwise null
      • removeMany

        public java.util.List<T> removeMany​(boolean nullify,
                                            int max_results,
                                            java.util.function.Predicate<T> filter)
        Specified by:
        removeMany in class Buffer<T>
      • removeMany

        public <R> R removeMany​(boolean nullify,
                                int max_results,
                                java.util.function.Predicate<T> filter,
                                java.util.function.Supplier<R> result_creator,
                                java.util.function.BiConsumer<R,​T> accumulator)
        Specified by:
        removeMany in class Buffer<T>
      • get

        public T get​(long seqno)
        Specified by:
        get in class Buffer<T>
      • _get

        public T _get​(long seqno)
        Only used for testing !!
        Specified by:
        _get in class Buffer<T>
      • purge

        public int purge​(long seqno,
                         boolean force)
        Description copied from class: Buffer
        Purges (nulls) all elements <= seqno.
        Specified by:
        purge in class Buffer<T>
        Parameters:
        seqno - All elements <= seqno will be purged.
        force - If false, seqno is max(seqno,hd), else max(seqno,high). In the latter case (seqno > hd), we might purge elements that have not yet been received
        Returns:
        0. The number of purged elements
      • changeCapacity

        public void changeCapacity​(int new_capacity)
        Changes the size of the buffer. This method should NOT be used; it is only here to change config in perf tests dynamically!!
      • forEach

        public void forEach​(long from,
                            long to,
                            Buffer.Visitor<T> visitor,
                            boolean nullify)
        Specified by:
        forEach in class Buffer<T>
      • forEach

        public void forEach​(long from,
                            long to,
                            Buffer.Visitor<T> visitor,
                            boolean nullify,
                            boolean respect_stop)
      • open

        public void open​(boolean b)
        Overrides:
        open in class Buffer<T>
      • iterator

        public java.util.Iterator<T> iterator()
        Returns an iterator over the elements of the ring buffer in the range [LOW+1 .. HIGH]
        Returns:
        FixedBufferIterator
        Throws:
        java.util.NoSuchElementException - is HD is moved forward during the iteration
      • iterator

        public java.util.Iterator<T> iterator​(long from,
                                              long to)
        Specified by:
        iterator in class Buffer<T>
      • stream

        public java.util.stream.Stream<T> stream()
        Specified by:
        stream in class Buffer<T>
      • stream

        public java.util.stream.Stream<T> stream​(long from,
                                                 long to)
        Specified by:
        stream in class Buffer<T>
      • index

        protected int index​(long seqno)
      • block

        protected boolean block​(long seqno)
      • increaseCapacity

        protected void increaseCapacity​(int new_cap)
      • decreaseCapacity

        protected void decreaseCapacity​(int new_cap)