Class RingBufferBundler

  • All Implemented Interfaces:
    Bundler

    public class RingBufferBundler
    extends BaseBundler
    Bundler which uses RingBuffer to store messages. The difference to TransferQueueBundler is that RingBuffer uses a wait strategy (to for example spinning) before blocking. Also, the hashmap of the superclass is not used, but the array of the RingBuffer is used directly to bundle and send messages, minimizing memory allocation.
    • Field Detail

      • bundler_thread

        protected Runner bundler_thread
      • num_spins

        protected int num_spins
      • wait_strategy

        protected java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> wait_strategy
      • run_function

        protected final java.lang.Runnable run_function
      • SPIN

        protected static final java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> SPIN
      • YIELD

        protected static final java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> YIELD
      • PARK

        protected static final java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> PARK
      • SPIN_PARK

        protected static final java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> SPIN_PARK
      • SPIN_YIELD

        protected static final java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> SPIN_YIELD
    • Constructor Detail

      • RingBufferBundler

        public RingBufferBundler()
      • RingBufferBundler

        public RingBufferBundler​(int capacity)
    • Method Detail

      • size

        public int size()
        Description copied from class: BaseBundler
        Returns the total number of messages in the hashmap
        Specified by:
        size in interface Bundler
        Overrides:
        size in class BaseBundler
      • getQueueSize

        public int getQueueSize()
        Description copied from interface: Bundler
        If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
        This method needs to be fast as it might get called on every message to be sent.
        Specified by:
        getQueueSize in interface Bundler
        Overrides:
        getQueueSize in class BaseBundler
      • numSpins

        public int numSpins()
      • waitStrategy

        public java.lang.String waitStrategy()
      • init

        public void init​(TP transport)
        Description copied from interface: Bundler
        Called after creation of the bundler
        Specified by:
        init in interface Bundler
        Overrides:
        init in class BaseBundler
        Parameters:
        transport - the transport, for further reference
      • renameThread

        public void renameThread()
      • send

        public void send​(Message msg)
                  throws java.lang.Exception
        Specified by:
        send in interface Bundler
        Overrides:
        send in class BaseBundler
        Throws:
        java.lang.Exception
      • sendBundledMessages

        public void sendBundledMessages​(Message[] buf,
                                        int read_index,
                                        int available_msgs)
        Read and send messages in range [read-index .. read-index+available_msgs-1]
      • _loopback

        protected void _loopback​(Address dest,
                                 Address sender,
                                 Message[] buf,
                                 int start_index,
                                 int end_index)
      • _send

        protected void _send​(Address dest,
                             Message msg,
                             byte[] cluster_name,
                             Message[] buf,
                             int start,
                             int end,
                             java.util.List<Message> list)
      • marshalMessagesToSameDestination

        protected int marshalMessagesToSameDestination​(Address dest,
                                                       Message[] buf,
                                                       int start_index,
                                                       int end_index,
                                                       int max_bundle_size,
                                                       java.util.List<Message> list)
                                                throws java.lang.Exception
        Throws:
        java.lang.Exception
      • readMessages

        protected void readMessages()
      • advance

        protected final int advance​(int index)
      • index

        protected final int index​(int idx)
      • print

        protected static java.lang.String print​(java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> wait_strategy)
      • createWaitStrategy

        protected java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> createWaitStrategy​(java.lang.String st,
                                                                                                              java.util.function.BiConsumer<java.lang.Integer,​java.lang.Integer> default_wait_strategy)
      • assertPositive

        protected static int assertPositive​(int value,
                                            java.lang.String message)