Package org.jgroups.protocols
Class RemoveQueueBundler
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RemoveQueueBundler
-
- All Implemented Interfaces:
Bundler
public class RemoveQueueBundler extends BaseBundler
Bundler implementation which sends message batches (or single messages) as soon as the remove queue is full (or max_bundler_size would be exceeded).
Messages are removed from the main queue and processed as follows (assuming they all fit into the remove queue):
A B B C C A causes the following sends: {AA} -> {CC} -> {BB}
Note that null is also a valid destination (send-to-all).
Contrary toTransferQueueBundler
, this bundler uses aRingBuffer
rather than an ArrayBlockingQueue and the size of the remove queue is fixed. TransferQueueBundler increases the size of the remove queue dynamically, which leads to higher latency if the remove queue grows too much.
JIRA: https://issues.redhat.com/browse/JGRP-2171- Since:
- 4.0.4
- Author:
- Bela Ban
-
-
Field Summary
Fields Modifier and Type Field Description protected AverageMinMax
avg_batch_size
protected int
queue_size
protected RingBuffer<Message>
rb
protected Message[]
remove_queue
protected Runner
runner
protected static java.lang.String
THREAD_NAME
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_send_time, capacity, count, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, output, transport
-
-
Constructor Summary
Constructors Constructor Description RemoveQueueBundler()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description java.lang.String
avgBatchSize()
int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.void
init(TP transport)
Called after creation of the bundlervoid
renameThread()
void
resetStats()
int
ringBufferSize()
void
run()
void
send(Message msg)
protected void
sendMessageList(Address dest, Address src, java.util.List<Message> list, ByteArrayDataOutputStream out)
int
size()
Returns the total number of messages in the hashmapvoid
start()
Called afterBundler.init(TP)
void
stop()
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, sendBundledMessages, sendMessageList, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, viewChange
-
-
-
-
Field Detail
-
rb
protected RingBuffer<Message> rb
-
runner
protected Runner runner
-
remove_queue
protected Message[] remove_queue
-
avg_batch_size
protected final AverageMinMax avg_batch_size
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
queue_size
protected int queue_size
-
-
Method Detail
-
avgBatchSize
public java.lang.String avgBatchSize()
-
ringBufferSize
public int ringBufferSize()
-
resetStats
public void resetStats()
- Specified by:
resetStats
in interfaceBundler
- Overrides:
resetStats
in classBaseBundler
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler- Specified by:
init
in interfaceBundler
- Overrides:
init
in classBaseBundler
- Parameters:
transport
- the transport, for further reference
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
run
public void run()
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
size
public int size()
Description copied from class:BaseBundler
Returns the total number of messages in the hashmap- Specified by:
size
in interfaceBundler
- Overrides:
size
in classBaseBundler
-
sendMessageList
protected void sendMessageList(Address dest, Address src, java.util.List<Message> list, ByteArrayDataOutputStream out)
- Overrides:
sendMessageList
in classBaseBundler
-
-