Package org.jgroups.protocols
Class RingBufferBundlerLockless2
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.RingBufferBundlerLockless2
-
- All Implemented Interfaces:
Bundler
public class RingBufferBundlerLockless2 extends BaseBundler
Lockless bundler using a reader thread which is unparked by (exactly one) writer thread. TODO: needs to be changed to support loopback (https://issues.redhat.com/browse/JGRP-2831)- Since:
- 4.0
- Author:
- Bela Ban
-
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicLong
accumulated_bytes
protected Message[]
buf
protected Runner
bundler_thread
static Message
NULL_MSG
protected java.util.concurrent.atomic.AtomicInteger
num_threads
protected java.util.concurrent.atomic.AtomicInteger
read_index
protected int
ri
protected java.lang.Runnable
run_function
protected static java.lang.String
THREAD_NAME
protected java.util.concurrent.atomic.AtomicBoolean
unparking
protected java.util.concurrent.atomic.AtomicInteger
write_index
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_fill_count, avg_remove_queue_size, avg_send_time, capacity, count, drop_when_full, FMT, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, num_batches_sent, num_drops_on_full_queue, num_sends_because_full_queue, num_sends_because_no_msgs, num_single_msgs_sent, output, remove_queue_capacity, suppress_log, suppress_log_timeout, total_msgs_sent, transport, use_ringbuffer, use_single_sender_thread
-
-
Constructor Summary
Constructors Constructor Description RingBufferBundlerLockless2()
RingBufferBundlerLockless2(int capacity)
RingBufferBundlerLockless2(int capacity, boolean padded)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int
_readMessages()
protected int
_size(int ri, int wi)
protected boolean
advanceReadIndex(int wi)
protected static int
assertPositive(int value, java.lang.String message)
int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected int
getWriteIndex(int current_read_index)
protected int
increment(int index)
protected int
index(int idx)
void
init(TP transport)
Called after creation of the bundlerprotected int
marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size)
int
readIndex()
protected void
readMessages()
void
renameThread()
RingBufferBundlerLockless2
reset()
void
send(Message msg)
protected int
sendBundledMessages(Message[] buf, int read_index, int write_index)
Read and send messages in range [read-index+1 ..int
size()
Returns the total number of messages in the hashmapvoid
start()
Called afterBundler.init(TP)
void
stop()
java.lang.String
toString()
protected void
unparkIfNeeded(int size)
int
writeIndex()
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, avgBatchSize, dropWhenFull, dropWhenFull, getCapacity, getMaxSize, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, removeQueueCapacity, removeQueueCapacity, resetStats, sendBundledMessages, sendMessageList, sendMessageListArray, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, useRingBuffer, useRingBuffer, useSingleSenderThread, useSingleSenderThread, viewChange
-
-
-
-
Field Detail
-
buf
protected Message[] buf
-
read_index
protected final java.util.concurrent.atomic.AtomicInteger read_index
-
ri
protected int ri
-
write_index
protected final java.util.concurrent.atomic.AtomicInteger write_index
-
accumulated_bytes
protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
-
num_threads
protected final java.util.concurrent.atomic.AtomicInteger num_threads
-
unparking
protected final java.util.concurrent.atomic.AtomicBoolean unparking
-
bundler_thread
protected Runner bundler_thread
-
run_function
protected final java.lang.Runnable run_function
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
-
NULL_MSG
public static final Message NULL_MSG
-
-
Method Detail
-
readIndex
public int readIndex()
-
writeIndex
public int writeIndex()
-
reset
public RingBufferBundlerLockless2 reset()
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
size
public int size()
Description copied from class:BaseBundler
Returns the total number of messages in the hashmap- Specified by:
size
in interfaceBundler
- Overrides:
size
in classBaseBundler
-
_size
protected int _size(int ri, int wi)
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler- Specified by:
init
in interfaceBundler
- Overrides:
init
in classBaseBundler
- Parameters:
transport
- the transport, for further reference
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
renameThread
public void renameThread()
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
unparkIfNeeded
protected void unparkIfNeeded(int size)
-
getWriteIndex
protected int getWriteIndex(int current_read_index)
-
_readMessages
public int _readMessages()
-
advanceReadIndex
protected boolean advanceReadIndex(int wi)
-
readMessages
protected void readMessages()
-
sendBundledMessages
protected int sendBundledMessages(Message[] buf, int read_index, int write_index)
Read and send messages in range [read-index+1 .. write_index-1]
-
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size) throws java.lang.Exception
- Throws:
java.lang.Exception
-
increment
protected final int increment(int index)
-
index
protected final int index(int idx)
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-