Package org.jgroups.protocols
Class RingBufferBundler
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundler
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundler extends BaseBundler
Bundler which usesRingBuffer
to store messages. The difference toTransferQueueBundler
is that RingBuffer uses a wait strategy (to for example spinning) before blocking. Also, the hashmap of the superclass is not used, but the array of the RingBuffer is used directly to bundle and send messages, minimizing memory allocation.
-
-
Field Summary
Fields Modifier and Type Field Description protected Runner
bundler_thread
protected int
num_spins
protected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
PARK
protected RingBuffer<Message>
rb
protected java.lang.Runnable
run_function
protected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
SPIN
protected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
SPIN_PARK
protected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
SPIN_YIELD
protected static java.lang.String
THREAD_NAME
protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
wait_strategy
protected static java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
YIELD
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_send_time, capacity, count, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, output, transport
-
-
Constructor Summary
Constructors Modifier Constructor Description RingBufferBundler()
RingBufferBundler(int capacity)
protected
RingBufferBundler(RingBuffer<Message> rb)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
_loopback(Address dest, Address sender, Message[] buf, int start_index, int end_index)
protected void
_send(Address dest, Message msg, byte[] cluster_name, Message[] buf, int start, int end, java.util.List<Message> list)
protected int
advance(int index)
protected static int
assertPositive(int value, java.lang.String message)
RingBuffer<Message>
buf()
protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer>
createWaitStrategy(java.lang.String st, java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> default_wait_strategy)
int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected int
index(int idx)
void
init(TP transport)
Called after creation of the bundlerprotected int
marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size, java.util.List<Message> list)
int
numSpins()
RingBufferBundler
numSpins(int n)
protected static java.lang.String
print(java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> wait_strategy)
protected void
readMessages()
void
renameThread()
void
send(Message msg)
void
sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index ..int
size()
Returns the total number of messages in the hashmapvoid
start()
Called afterBundler.init(TP)
void
stop()
java.lang.String
waitStrategy()
RingBufferBundler
waitStrategy(java.lang.String st)
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, resetStats, sendBundledMessages, sendMessageList, sendMessageList, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, viewChange
-
-
-
-
Field Detail
-
rb
protected RingBuffer<Message> rb
-
bundler_thread
protected Runner bundler_thread
-
num_spins
protected int num_spins
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
wait_strategy
protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> wait_strategy
-
run_function
protected final java.lang.Runnable run_function
-
SPIN
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> SPIN
-
YIELD
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> YIELD
-
PARK
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> PARK
-
SPIN_PARK
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> SPIN_PARK
-
SPIN_YIELD
protected static final java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> SPIN_YIELD
-
-
Constructor Detail
-
RingBufferBundler
public RingBufferBundler()
-
RingBufferBundler
protected RingBufferBundler(RingBuffer<Message> rb)
-
RingBufferBundler
public RingBufferBundler(int capacity)
-
-
Method Detail
-
buf
public RingBuffer<Message> buf()
-
size
public int size()
Description copied from class:BaseBundler
Returns the total number of messages in the hashmap- Specified by:
size
in interfaceBundler
- Overrides:
size
in classBaseBundler
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
numSpins
public int numSpins()
-
numSpins
public RingBufferBundler numSpins(int n)
-
waitStrategy
public java.lang.String waitStrategy()
-
waitStrategy
public RingBufferBundler waitStrategy(java.lang.String st)
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler- Specified by:
init
in interfaceBundler
- Overrides:
init
in classBaseBundler
- Parameters:
transport
- the transport, for further reference
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
sendBundledMessages
public void sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index .. read-index+available_msgs-1]
-
_loopback
protected void _loopback(Address dest, Address sender, Message[] buf, int start_index, int end_index)
-
_send
protected void _send(Address dest, Message msg, byte[] cluster_name, Message[] buf, int start, int end, java.util.List<Message> list)
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size, java.util.List<Message> list) throws java.lang.Exception
- Throws:
java.lang.Exception
-
readMessages
protected void readMessages()
-
advance
protected final int advance(int index)
-
index
protected final int index(int idx)
-
print
protected static java.lang.String print(java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> wait_strategy)
-
createWaitStrategy
protected java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> createWaitStrategy(java.lang.String st, java.util.function.BiConsumer<java.lang.Integer,java.lang.Integer> default_wait_strategy)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-