Class RingBufferBundlerLockless2

  • All Implemented Interfaces:
    Bundler

    public class RingBufferBundlerLockless2
    extends BaseBundler
    Lockless bundler using a reader thread which is unparked by (exactly one) writer thread. TODO: needs to be changed to support loopback (https://issues.redhat.com/browse/JGRP-2831)
    Since:
    4.0
    Author:
    Bela Ban
    • Field Detail

      • read_index

        protected final java.util.concurrent.atomic.AtomicInteger read_index
      • ri

        protected int ri
      • write_index

        protected final java.util.concurrent.atomic.AtomicInteger write_index
      • accumulated_bytes

        protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
      • num_threads

        protected final java.util.concurrent.atomic.AtomicInteger num_threads
      • unparking

        protected final java.util.concurrent.atomic.AtomicBoolean unparking
      • bundler_thread

        protected Runner bundler_thread
      • run_function

        protected final java.lang.Runnable run_function
      • THREAD_NAME

        protected static final java.lang.String THREAD_NAME
      • NULL_MSG

        public static final Message NULL_MSG
    • Constructor Detail

      • RingBufferBundlerLockless2

        public RingBufferBundlerLockless2()
      • RingBufferBundlerLockless2

        public RingBufferBundlerLockless2​(int capacity)
      • RingBufferBundlerLockless2

        public RingBufferBundlerLockless2​(int capacity,
                                          boolean padded)
    • Method Detail

      • readIndex

        public int readIndex()
      • writeIndex

        public int writeIndex()
      • getQueueSize

        public int getQueueSize()
        Description copied from interface: Bundler
        If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
        This method needs to be fast as it might get called on every message to be sent.
        Specified by:
        getQueueSize in interface Bundler
        Overrides:
        getQueueSize in class BaseBundler
      • size

        public int size()
        Description copied from class: BaseBundler
        Returns the total number of messages in the hashmap
        Specified by:
        size in interface Bundler
        Overrides:
        size in class BaseBundler
      • _size

        protected int _size​(int ri,
                            int wi)
      • init

        public void init​(TP transport)
        Description copied from interface: Bundler
        Called after creation of the bundler
        Specified by:
        init in interface Bundler
        Overrides:
        init in class BaseBundler
        Parameters:
        transport - the transport, for further reference
      • renameThread

        public void renameThread()
      • send

        public void send​(Message msg)
                  throws java.lang.Exception
        Specified by:
        send in interface Bundler
        Overrides:
        send in class BaseBundler
        Throws:
        java.lang.Exception
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object
      • unparkIfNeeded

        protected void unparkIfNeeded​(int size)
      • getWriteIndex

        protected int getWriteIndex​(int current_read_index)
      • _readMessages

        public int _readMessages()
      • advanceReadIndex

        protected boolean advanceReadIndex​(int wi)
      • readMessages

        protected void readMessages()
      • sendBundledMessages

        protected int sendBundledMessages​(Message[] buf,
                                          int read_index,
                                          int write_index)
        Read and send messages in range [read-index+1 .. write_index-1]
      • marshalMessagesToSameDestination

        protected int marshalMessagesToSameDestination​(Address dest,
                                                       Message[] buf,
                                                       int start_index,
                                                       int end_index,
                                                       int max_bundle_size)
                                                throws java.lang.Exception
        Throws:
        java.lang.Exception
      • increment

        protected final int increment​(int index)
      • index

        protected final int index​(int idx)
      • assertPositive

        protected static int assertPositive​(int value,
                                            java.lang.String message)