Package org.jgroups.protocols
Class RingBufferBundlerLockless2
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundlerLockless2
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundlerLockless2 extends BaseBundler
Lockless bundler using a reader thread which is unparked by (exactly one) writer thread. TODO: needs to be changed to support loopback (https://issues.redhat.com/browse/JGRP-2831)- Since:
- 4.0
- Author:
- Bela Ban
-
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicLongaccumulated_bytesprotected Message[]bufprotected Runnerbundler_threadstatic MessageNULL_MSGprotected java.util.concurrent.atomic.AtomicIntegernum_threadsprotected java.util.concurrent.atomic.AtomicIntegerread_indexprotected intriprotected java.lang.Runnablerun_functionprotected static java.lang.StringTHREAD_NAMEprotected java.util.concurrent.atomic.AtomicBooleanunparkingprotected java.util.concurrent.atomic.AtomicIntegerwrite_index-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_fill_count, avg_remove_queue_size, avg_send_time, capacity, count, drop_when_full, FMT, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, num_batches_sent, num_drops_on_full_queue, num_sends_because_full_queue, num_sends_because_no_msgs, num_single_msgs_sent, output, remove_queue_capacity, suppress_log, suppress_log_timeout, total_msgs_sent, transport, use_ringbuffer, use_single_sender_thread
-
-
Constructor Summary
Constructors Constructor Description RingBufferBundlerLockless2()RingBufferBundlerLockless2(int capacity)RingBufferBundlerLockless2(int capacity, boolean padded)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int_readMessages()protected int_size(int ri, int wi)protected booleanadvanceReadIndex(int wi)protected static intassertPositive(int value, java.lang.String message)intgetQueueSize()If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected intgetWriteIndex(int current_read_index)protected intincrement(int index)protected intindex(int idx)voidinit(TP transport)Called after creation of the bundlerprotected intmarshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size)intreadIndex()protected voidreadMessages()voidrenameThread()RingBufferBundlerLockless2reset()voidsend(Message msg)protected intsendBundledMessages(Message[] buf, int read_index, int write_index)Read and send messages in range [read-index+1 ..intsize()Returns the total number of messages in the hashmapvoidstart()Called afterBundler.init(TP)voidstop()java.lang.StringtoString()protected voidunparkIfNeeded(int size)intwriteIndex()-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, avgBatchSize, dropWhenFull, dropWhenFull, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, removeQueueCapacity, removeQueueCapacity, resetStats, sendBundledMessages, sendMessageList, sendMessageListArray, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, useRingBuffer, useRingBuffer, useSingleSenderThread, useSingleSenderThread, viewChange
-
-
-
-
Field Detail
-
buf
protected Message[] buf
-
read_index
protected final java.util.concurrent.atomic.AtomicInteger read_index
-
ri
protected int ri
-
write_index
protected final java.util.concurrent.atomic.AtomicInteger write_index
-
accumulated_bytes
protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
-
num_threads
protected final java.util.concurrent.atomic.AtomicInteger num_threads
-
unparking
protected final java.util.concurrent.atomic.AtomicBoolean unparking
-
bundler_thread
protected Runner bundler_thread
-
run_function
protected final java.lang.Runnable run_function
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
-
NULL_MSG
public static final Message NULL_MSG
-
-
Method Detail
-
readIndex
public int readIndex()
-
writeIndex
public int writeIndex()
-
reset
public RingBufferBundlerLockless2 reset()
-
getQueueSize
public int getQueueSize()
Description copied from interface:BundlerIf the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSizein interfaceBundler- Overrides:
getQueueSizein classBaseBundler
-
size
public int size()
Description copied from class:BaseBundlerReturns the total number of messages in the hashmap- Specified by:
sizein interfaceBundler- Overrides:
sizein classBaseBundler
-
_size
protected int _size(int ri, int wi)
-
init
public void init(TP transport)
Description copied from interface:BundlerCalled after creation of the bundler- Specified by:
initin interfaceBundler- Overrides:
initin classBaseBundler- Parameters:
transport- the transport, for further reference
-
start
public void start()
Description copied from interface:BundlerCalled afterBundler.init(TP)- Specified by:
startin interfaceBundler- Overrides:
startin classBaseBundler
-
stop
public void stop()
- Specified by:
stopin interfaceBundler- Overrides:
stopin classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
sendin interfaceBundler- Overrides:
sendin classBaseBundler- Throws:
java.lang.Exception
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.lang.Object
-
unparkIfNeeded
protected void unparkIfNeeded(int size)
-
getWriteIndex
protected int getWriteIndex(int current_read_index)
-
_readMessages
public int _readMessages()
-
advanceReadIndex
protected boolean advanceReadIndex(int wi)
-
readMessages
protected void readMessages()
-
sendBundledMessages
protected int sendBundledMessages(Message[] buf, int read_index, int write_index)
Read and send messages in range [read-index+1 .. write_index-1]
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size) throws java.lang.Exception
- Throws:
java.lang.Exception
-
increment
protected final int increment(int index)
-
index
protected final int index(int idx)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-