Package org.jgroups.protocols
Class PerDestinationBundler
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.PerDestinationBundler
-
- All Implemented Interfaces:
java.lang.Runnable
,Bundler
public class PerDestinationBundler extends BaseBundler implements java.lang.Runnable
Queues messages per destination ('null' is a special destination). Uses 1 thread per destination to process queued messages, so it won't scale to many cluster members (unless virtual threads are used).
See https://issues.redhat.com/browse/JGRP-2639 for details.- Since:
- 5.2.7
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
PerDestinationBundler.SendBuffer
-
Field Summary
Fields Modifier and Type Field Description protected java.util.Map<Address,PerDestinationBundler.SendBuffer>
dests
protected Address
local_addr
protected java.util.concurrent.atomic.AtomicBoolean
msgs_available
protected java.util.concurrent.locks.Condition
not_empty
protected static Address
NULL
protected Runner
single_thread_runner
protected static java.lang.String
THREAD_NAME
protected boolean
use_single_sender_thread
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_fill_count, avg_remove_queue_size, avg_send_time, capacity, count, drop_when_full, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, num_batches_sent, num_drops_on_full_queue, num_sends_because_full_queue, num_sends_because_no_msgs, num_single_msgs_sent, output, remove_queue_capacity, total_msgs_sent, transport
-
-
Constructor Summary
Constructors Constructor Description PerDestinationBundler()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description java.lang.String
active()
java.lang.String
dests()
java.lang.String
dump()
int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.void
init(TP transport)
Called after creation of the bundlerboolean
isRunning()
void
run()
Iterates through the send buffers and sends when messages are available.void
send(Message msg)
protected void
signalNotEmpty()
int
size()
Returns the total number of messages in the hashmapvoid
start()
Called afterBundler.init(TP)
void
stop()
boolean
useSingleSenderThread()
PerDestinationBundler
useSingleSenderThread(boolean use_single_thread)
void
viewChange(View view)
protected void
waitUntilMessagesAreAvailable()
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, avgBatchSize, dropWhenFull, dropWhenFull, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, removeQueueCapacity, removeQueueCapacity, resetStats, sendBundledMessages, sendMessageList, sendMessageListArray, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.jgroups.protocols.Bundler
destroy, renameThread
-
-
-
-
Field Detail
-
local_addr
protected Address local_addr
-
dests
protected final java.util.Map<Address,PerDestinationBundler.SendBuffer> dests
-
NULL
protected static final Address NULL
-
single_thread_runner
protected Runner single_thread_runner
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
not_empty
protected final java.util.concurrent.locks.Condition not_empty
-
msgs_available
protected final java.util.concurrent.atomic.AtomicBoolean msgs_available
-
use_single_sender_thread
protected boolean use_single_sender_thread
-
-
Method Detail
-
isRunning
public boolean isRunning()
-
useSingleSenderThread
public boolean useSingleSenderThread()
-
useSingleSenderThread
public PerDestinationBundler useSingleSenderThread(boolean use_single_thread)
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
size
public int size()
Description copied from class:BaseBundler
Returns the total number of messages in the hashmap- Specified by:
size
in interfaceBundler
- Overrides:
size
in classBaseBundler
-
dump
public java.lang.String dump()
-
active
public java.lang.String active()
-
dests
public java.lang.String dests()
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler- Specified by:
init
in interfaceBundler
- Overrides:
init
in classBaseBundler
- Parameters:
transport
- the transport, for further reference
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
run
public void run()
Iterates through the send buffers and sends when messages are available. When an iteration found no messages to send, the thread blocks on a condition that is signalled as soon as messages are available in any of the buffers- Specified by:
run
in interfacejava.lang.Runnable
-
viewChange
public void viewChange(View view)
- Specified by:
viewChange
in interfaceBundler
- Overrides:
viewChange
in classBaseBundler
-
signalNotEmpty
protected void signalNotEmpty()
-
waitUntilMessagesAreAvailable
protected void waitUntilMessagesAreAvailable()
-
-