Package org.jgroups.protocols
Class RingBufferBundlerLockless2
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundlerLockless2
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundlerLockless2 extends BaseBundler
Lockless bundler using a reader thread which is unparked by (exactly one) writer thread. TODO: needs to be changed to support loopback (https://issues.redhat.com/browse/JGRP-2831)- Since:
- 4.0
- Author:
- Bela Ban
-
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicLong
accumulated_bytes
protected Message[]
buf
protected Runner
bundler_thread
static Message
NULL_MSG
protected java.util.concurrent.atomic.AtomicInteger
num_threads
protected java.util.concurrent.atomic.AtomicInteger
read_index
protected int
ri
protected java.lang.Runnable
run_function
protected static java.lang.String
THREAD_NAME
protected java.util.concurrent.atomic.AtomicBoolean
unparking
protected java.util.concurrent.atomic.AtomicInteger
write_index
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_send_time, capacity, count, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, output, transport
-
-
Constructor Summary
Constructors Constructor Description RingBufferBundlerLockless2()
RingBufferBundlerLockless2(int capacity)
RingBufferBundlerLockless2(int capacity, boolean padded)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int
_readMessages()
protected int
_size(int ri, int wi)
protected boolean
advanceReadIndex(int wi)
protected static int
assertPositive(int value, java.lang.String message)
int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected int
getWriteIndex(int current_read_index)
protected int
increment(int index)
protected int
index(int idx)
void
init(TP transport)
Called after creation of the bundlerprotected int
marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size)
int
readIndex()
protected void
readMessages()
void
renameThread()
RingBufferBundlerLockless2
reset()
void
send(Message msg)
protected int
sendBundledMessages(Message[] buf, int read_index, int write_index)
Read and send messages in range [read-index+1 ..int
size()
Returns the total number of messages in the hashmapvoid
start()
Called afterBundler.init(TP)
void
stop()
java.lang.String
toString()
protected void
unparkIfNeeded(int size)
int
writeIndex()
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, resetStats, sendBundledMessages, sendMessageList, sendMessageList, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, viewChange
-
-
-
-
Field Detail
-
buf
protected Message[] buf
-
read_index
protected final java.util.concurrent.atomic.AtomicInteger read_index
-
ri
protected int ri
-
write_index
protected final java.util.concurrent.atomic.AtomicInteger write_index
-
accumulated_bytes
protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
-
num_threads
protected final java.util.concurrent.atomic.AtomicInteger num_threads
-
unparking
protected final java.util.concurrent.atomic.AtomicBoolean unparking
-
bundler_thread
protected Runner bundler_thread
-
run_function
protected final java.lang.Runnable run_function
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
-
NULL_MSG
public static final Message NULL_MSG
-
-
Method Detail
-
readIndex
public int readIndex()
-
writeIndex
public int writeIndex()
-
reset
public RingBufferBundlerLockless2 reset()
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
size
public int size()
Description copied from class:BaseBundler
Returns the total number of messages in the hashmap- Specified by:
size
in interfaceBundler
- Overrides:
size
in classBaseBundler
-
_size
protected int _size(int ri, int wi)
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler- Specified by:
init
in interfaceBundler
- Overrides:
init
in classBaseBundler
- Parameters:
transport
- the transport, for further reference
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
unparkIfNeeded
protected void unparkIfNeeded(int size)
-
getWriteIndex
protected int getWriteIndex(int current_read_index)
-
_readMessages
public int _readMessages()
-
advanceReadIndex
protected boolean advanceReadIndex(int wi)
-
readMessages
protected void readMessages()
-
sendBundledMessages
protected int sendBundledMessages(Message[] buf, int read_index, int write_index)
Read and send messages in range [read-index+1 .. write_index-1]
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size) throws java.lang.Exception
- Throws:
java.lang.Exception
-
increment
protected final int increment(int index)
-
index
protected final int index(int idx)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-