Package org.jgroups.protocols
Class RingBufferBundler
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundler
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundler extends BaseBundler
Bundler which usesRingBufferto store messages. The difference toTransferQueueBundleris that RingBuffer uses a wait strategy (to for example spinning) before blocking. Also, the hashmap of the superclass is not used, but the array of the RingBuffer is used directly to bundle and send messages, minimizing memory allocation.
-
-
Field Summary
Fields Modifier and Type Field Description protected Runnerbundler_threadprotected intcapacityprotected intnum_spinsprotected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>PARKprotected RingBuffer<Message>rbprotected java.lang.Runnablerun_functionprotected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>SPINprotected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>SPIN_PARKprotected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>SPIN_YIELDprotected static java.lang.StringTHREAD_NAMEprotected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>wait_strategyprotected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>YIELD
-
Constructor Summary
Constructors Modifier Constructor Description RingBufferBundler()RingBufferBundler(int capacity)protectedRingBufferBundler(RingBuffer<Message> rb)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected intadvance(int index)protected static intassertPositive(int value, java.lang.String message)RingBuffer<Message>buf()protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>createWaitStrategy(java.lang.String st, java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> default_wait_strategy)intgetQueueSize()If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.java.lang.ThreadgetThread()protected intindex(int idx)voidinit(TP transport)Called after creation of the bundlerprotected intmarshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size)intnumSpins()RingBufferBundlernumSpins(int n)protected static java.lang.Stringprint(java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> wait_strategy)protected voidreadMessages()voidsend(Message msg)voidsendBundledMessages(Message[] buf, int read_index, int available_msgs)Read and send messages in range [read-index ..intsize()Returns the total number of messages in the hashmapvoidstart()Called afterBundler.init(TP)voidstop()java.lang.StringwaitStrategy()RingBufferBundlerwaitStrategy(java.lang.String st)-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, clearMessages, sendBundledMessages, sendMessageList, sendSingleMessage, viewChange
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.jgroups.protocols.Bundler
getStats, resetStats
-
-
-
-
Field Detail
-
rb
protected RingBuffer<Message> rb
-
bundler_thread
protected Runner bundler_thread
-
num_spins
protected int num_spins
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
wait_strategy
protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> wait_strategy
-
capacity
protected int capacity
-
run_function
protected final java.lang.Runnable run_function
-
SPIN
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> SPIN
-
YIELD
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> YIELD
-
PARK
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> PARK
-
SPIN_PARK
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> SPIN_PARK
-
SPIN_YIELD
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> SPIN_YIELD
-
-
Constructor Detail
-
RingBufferBundler
public RingBufferBundler()
-
RingBufferBundler
protected RingBufferBundler(RingBuffer<Message> rb)
-
RingBufferBundler
public RingBufferBundler(int capacity)
-
-
Method Detail
-
buf
public RingBuffer<Message> buf()
-
getThread
public java.lang.Thread getThread()
-
size
public int size()
Description copied from class:BaseBundlerReturns the total number of messages in the hashmap- Specified by:
sizein interfaceBundler- Overrides:
sizein classBaseBundler
-
getQueueSize
public int getQueueSize()
Description copied from interface:BundlerIf the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.
-
numSpins
public int numSpins()
-
numSpins
public RingBufferBundler numSpins(int n)
-
waitStrategy
public java.lang.String waitStrategy()
-
waitStrategy
public RingBufferBundler waitStrategy(java.lang.String st)
-
init
public void init(TP transport)
Description copied from interface:BundlerCalled after creation of the bundler- Specified by:
initin interfaceBundler- Overrides:
initin classBaseBundler- Parameters:
transport- the transport, for further reference
-
start
public void start()
Description copied from interface:BundlerCalled afterBundler.init(TP)- Specified by:
startin interfaceBundler- Overrides:
startin classBaseBundler
-
stop
public void stop()
- Specified by:
stopin interfaceBundler- Overrides:
stopin classBaseBundler
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
sendin interfaceBundler- Overrides:
sendin classBaseBundler- Throws:
java.lang.Exception
-
sendBundledMessages
public void sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index .. read-index+available_msgs-1]
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size) throws java.lang.Exception
- Throws:
java.lang.Exception
-
readMessages
protected void readMessages()
-
advance
protected final int advance(int index)
-
index
protected final int index(int idx)
-
print
protected static java.lang.String print(java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> wait_strategy)
-
createWaitStrategy
protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> createWaitStrategy(java.lang.String st, java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> default_wait_strategy)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-