Package org.jgroups.protocols
Class RingBufferBundlerLockless
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundlerLockless
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundlerLockless extends BaseBundler
Bundler which doesn't use locks but relies on CAS. There is 1 reader thread which gets unparked by (exactly one) writer when the max size has been exceeded, or no other threads are sending messages. TODO: needs to be changed to support loopback (https://issues.redhat.com/browse/JGRP-2831)- Since:
- 4.0
- Author:
- Bela Ban
-
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicLong
accumulated_bytes
protected Message[]
buf
protected Runner
bundler_thread
protected java.util.concurrent.atomic.AtomicInteger
num_threads
protected int
read_index
protected java.lang.Runnable
run_function
protected java.util.concurrent.atomic.AtomicInteger
size
protected static java.lang.String
THREAD_NAME
protected java.util.concurrent.atomic.AtomicInteger
tmp_write_index
protected java.util.concurrent.atomic.AtomicBoolean
unparking
protected int
write_index
protected java.util.concurrent.atomic.AtomicInteger
write_permits
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_fill_count, avg_remove_queue_size, avg_send_time, capacity, count, drop_when_full, FMT, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, num_batches_sent, num_drops_on_full_queue, num_sends_because_full_queue, num_sends_because_no_msgs, num_single_msgs_sent, output, remove_queue_capacity, suppress_log, suppress_log_timeout, total_msgs_sent, transport, use_ringbuffer, use_single_sender_thread
-
-
Constructor Summary
Constructors Constructor Description RingBufferBundlerLockless()
RingBufferBundlerLockless(int capacity)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int
_readMessages()
protected int
advanceWriteIndex()
protected static int
assertPositive(int value, java.lang.String message)
protected int
getPermitToWrite()
int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected int
getWriteIndex()
protected int
increment(int index)
protected int
index(int idx)
void
init(TP transport)
Called after creation of the bundlerprotected int
marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size)
int
readIndex()
protected void
readMessages()
void
renameThread()
void
reset()
void
send(Message msg)
protected int
sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index ..int
size()
Returns the total number of messages in the hashmapvoid
start()
Called afterBundler.init(TP)
void
stop()
java.lang.String
toString()
int
writeIndex()
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, avgBatchSize, dropWhenFull, dropWhenFull, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, removeQueueCapacity, removeQueueCapacity, resetStats, sendBundledMessages, sendMessageList, sendMessageListArray, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, useRingBuffer, useRingBuffer, useSingleSenderThread, useSingleSenderThread, viewChange
-
-
-
-
Field Detail
-
buf
protected Message[] buf
-
read_index
protected int read_index
-
write_index
protected volatile int write_index
-
tmp_write_index
protected final java.util.concurrent.atomic.AtomicInteger tmp_write_index
-
write_permits
protected final java.util.concurrent.atomic.AtomicInteger write_permits
-
size
protected final java.util.concurrent.atomic.AtomicInteger size
-
num_threads
protected final java.util.concurrent.atomic.AtomicInteger num_threads
-
accumulated_bytes
protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
-
unparking
protected final java.util.concurrent.atomic.AtomicBoolean unparking
-
bundler_thread
protected Runner bundler_thread
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
run_function
protected final java.lang.Runnable run_function
-
-
Method Detail
-
readIndex
public int readIndex()
-
writeIndex
public int writeIndex()
-
size
public int size()
Description copied from class:BaseBundler
Returns the total number of messages in the hashmap- Specified by:
size
in interfaceBundler
- Overrides:
size
in classBaseBundler
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler- Specified by:
init
in interfaceBundler
- Overrides:
init
in classBaseBundler
- Parameters:
transport
- the transport, for further reference
-
reset
public void reset()
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
getWriteIndex
protected int getWriteIndex()
-
getPermitToWrite
protected int getPermitToWrite()
-
advanceWriteIndex
protected int advanceWriteIndex()
-
readMessages
protected void readMessages()
-
sendBundledMessages
protected int sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index .. read-index+available_msgs-1]
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
_readMessages
public int _readMessages()
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size) throws java.lang.Exception
- Throws:
java.lang.Exception
-
increment
protected final int increment(int index)
-
index
protected final int index(int idx)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-