Package org.jgroups.protocols
Class RingBufferBundlerLockless
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundlerLockless
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundlerLockless extends BaseBundler
Bundler which doesn't use locks but relies on CAS. There is 1 reader thread which gets unparked by (exactly one) writer when the max size has been exceeded, or no other threads are sending messages. TODO: needs to be changed to support loopback (https://issues.redhat.com/browse/JGRP-2831)- Since:
- 4.0
- Author:
- Bela Ban
-
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicLongaccumulated_bytesprotected Message[]bufprotected Runnerbundler_threadprotected java.util.concurrent.atomic.AtomicIntegernum_threadsprotected intread_indexprotected java.lang.Runnablerun_functionprotected java.util.concurrent.atomic.AtomicIntegersizeprotected static java.lang.StringTHREAD_NAMEprotected java.util.concurrent.atomic.AtomicIntegertmp_write_indexprotected java.util.concurrent.atomic.AtomicBooleanunparkingprotected intwrite_indexprotected java.util.concurrent.atomic.AtomicIntegerwrite_permits-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_fill_count, avg_remove_queue_size, avg_send_time, capacity, count, drop_when_full, FMT, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, num_batches_sent, num_drops_on_full_queue, num_sends_because_full_queue, num_sends_because_no_msgs, num_single_msgs_sent, output, remove_queue_capacity, suppress_log, suppress_log_timeout, total_msgs_sent, transport, use_ringbuffer, use_single_sender_thread
-
-
Constructor Summary
Constructors Constructor Description RingBufferBundlerLockless()RingBufferBundlerLockless(int capacity)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int_readMessages()protected intadvanceWriteIndex()protected static intassertPositive(int value, java.lang.String message)protected intgetPermitToWrite()intgetQueueSize()If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected intgetWriteIndex()protected intincrement(int index)protected intindex(int idx)voidinit(TP transport)Called after creation of the bundlerprotected intmarshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size)intreadIndex()protected voidreadMessages()voidrenameThread()voidreset()voidsend(Message msg)protected intsendBundledMessages(Message[] buf, int read_index, int available_msgs)Read and send messages in range [read-index ..intsize()Returns the total number of messages in the hashmapvoidstart()Called afterBundler.init(TP)voidstop()java.lang.StringtoString()intwriteIndex()-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, avgBatchSize, dropWhenFull, dropWhenFull, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, removeQueueCapacity, removeQueueCapacity, resetStats, sendBundledMessages, sendMessageList, sendMessageListArray, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, useRingBuffer, useRingBuffer, useSingleSenderThread, useSingleSenderThread, viewChange
-
-
-
-
Field Detail
-
buf
protected Message[] buf
-
read_index
protected int read_index
-
write_index
protected volatile int write_index
-
tmp_write_index
protected final java.util.concurrent.atomic.AtomicInteger tmp_write_index
-
write_permits
protected final java.util.concurrent.atomic.AtomicInteger write_permits
-
size
protected final java.util.concurrent.atomic.AtomicInteger size
-
num_threads
protected final java.util.concurrent.atomic.AtomicInteger num_threads
-
accumulated_bytes
protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
-
unparking
protected final java.util.concurrent.atomic.AtomicBoolean unparking
-
bundler_thread
protected Runner bundler_thread
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
run_function
protected final java.lang.Runnable run_function
-
-
Method Detail
-
readIndex
public int readIndex()
-
writeIndex
public int writeIndex()
-
size
public int size()
Description copied from class:BaseBundlerReturns the total number of messages in the hashmap- Specified by:
sizein interfaceBundler- Overrides:
sizein classBaseBundler
-
getQueueSize
public int getQueueSize()
Description copied from interface:BundlerIf the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSizein interfaceBundler- Overrides:
getQueueSizein classBaseBundler
-
init
public void init(TP transport)
Description copied from interface:BundlerCalled after creation of the bundler- Specified by:
initin interfaceBundler- Overrides:
initin classBaseBundler- Parameters:
transport- the transport, for further reference
-
reset
public void reset()
-
start
public void start()
Description copied from interface:BundlerCalled afterBundler.init(TP)- Specified by:
startin interfaceBundler- Overrides:
startin classBaseBundler
-
stop
public void stop()
- Specified by:
stopin interfaceBundler- Overrides:
stopin classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
sendin interfaceBundler- Overrides:
sendin classBaseBundler- Throws:
java.lang.Exception
-
getWriteIndex
protected int getWriteIndex()
-
getPermitToWrite
protected int getPermitToWrite()
-
advanceWriteIndex
protected int advanceWriteIndex()
-
readMessages
protected void readMessages()
-
sendBundledMessages
protected int sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index .. read-index+available_msgs-1]
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.lang.Object
-
_readMessages
public int _readMessages()
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size) throws java.lang.Exception
- Throws:
java.lang.Exception
-
increment
protected final int increment(int index)
-
index
protected final int index(int idx)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-