Package org.jgroups.util
Class Table<T>
- java.lang.Object
-
- org.jgroups.util.Table<T>
-
- All Implemented Interfaces:
java.lang.Iterable<T>
public class Table<T> extends java.lang.Object implements java.lang.Iterable<T>
A store for elements (typically messages) to be retransmitted or delivered. Used on sender and receiver side.
Table maintains a matrix (an array of arrays) of elements, which are stored by mapping their seqno to an index. E.g. when we have 10 rows of 1000 elements each, and first_seqno is 3000, then an element with seqno=5600, will be stored in the 3rd row, at index 600.
Rows are removed when all elements in that row have been delivered.- Version:
- 3.1
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
Table.HighestDeliverable
protected class
Table.Missing
protected class
Table.NumDeliverable
protected class
Table.Remover<R>
protected class
Table.TableIterator
Iterates through all elements of the matrix.static interface
Table.Visitor<T>
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicInteger
adders
protected static long
DEFAULT_MAX_COMPACTION_TIME
protected static double
DEFAULT_RESIZE_FACTOR
protected int
elements_per_row
Must be a power of 2 for efficient modular arithmeticprotected long
hd
The highest delivered (= removed) seqnoprotected long
hr
The highest received seqnoprotected long
last_compaction_timestamp
The time when the last compaction took place.protected java.util.concurrent.locks.Lock
lock
protected long
low
The highest seqno purgedprotected T[][]
matrix
protected long
max_compaction_time
Time (in nanoseconds) after which a compaction should take place.protected int
num_compactions
protected int
num_moves
protected int
num_purges
protected int
num_resizes
protected int
num_rows
protected long
offset
The first seqno, at matrix[0][0]protected double
resize_factor
protected int
size
-
Constructor Summary
Constructors Constructor Description Table()
Table(int num_rows, int elements_per_row, long offset)
Table(int num_rows, int elements_per_row, long offset, double resize_factor)
Table(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time)
Creates a new tableTable(long offset)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected boolean
_add(long seqno, T element, boolean check_if_resize_needed, java.util.function.Predicate<T> remove_filter)
protected void
_compact()
Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly.T
_get(long seqno)
To be used only for testing; doesn't do any index or sanity checksboolean
add(long seqno, T element)
Adds an element if the element at the given index is null.boolean
add(long seqno, T element, java.util.function.Predicate<T> remove_filter)
Adds an element if the element at the given index is null.boolean
add(java.util.List<LongTuple<T>> list)
Adds elements from list to the tableboolean
add(java.util.List<LongTuple<T>> list, boolean remove_added_elements)
Adds elements from list to the table, removes elements from list that were not added to the tableboolean
add(java.util.List<LongTuple<T>> list, boolean remove_added_elements, T const_value)
Adds elements from the list to the tableboolean
add(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter)
boolean
add(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter, boolean remove_from_batch, T const_value)
Adds all messages from the given batch to the tableint
capacity()
Returns the total capacity in the matrixvoid
compact()
protected int
computeIndex(long seqno)
Computes and returns the index within a row for seqnoprotected int
computeRow(long seqno)
Computes and returns the row index for seqno.int
computeSize()
Iterate from low to hr and add up non-null values.java.lang.String
dump()
Dumps the seqnos in the table as a listprotected long
findHighestSeqno(java.util.List<LongTuple<T>> list)
protected static <T> long
findHighestSeqno(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter)
void
forEach(long from, long to, Table.Visitor<T> visitor)
Iterates over the matrix with range [from ..T
get(long seqno)
Returns an element at seqnojava.util.concurrent.atomic.AtomicInteger
getAdders()
long[]
getDigest()
int
getElementsPerRow()
long
getHighestDeliverable()
Returns the highest deliverable (= removable) seqno.long
getHighestDelivered()
long
getHighestReceived()
long
getLow()
long
getMaxCompactionTime()
SeqnoList
getMissing()
Returns a list of missing (= null) elementsSeqnoList
getMissing(int max_msgs)
Returns a list of missing messagesint
getNumCompactions()
int
getNumDeliverable()
Returns the number of messages that can be deliveredint
getNumMissing()
Returns the number of null elements in the range [hd+1 ..int
getNumMoves()
int
getNumPurges()
int
getNumResizes()
int
getNumRows()
long
getOffset()
protected T[]
getRow(int index)
Returns a row.boolean
isEmpty()
java.util.Iterator<T>
iterator()
java.util.Iterator<T>
iterator(long from, long to)
protected void
move(int num_rows)
Moves contents of matrix num_rows down.void
purge(long seqno)
Removes all elements less than or equal to seqno from the table.void
purge(long seqno, boolean force)
Removes all elements less than or equal to seqno from the table.T
remove()
T
remove(boolean nullify)
Removes the next non-null element and nulls the index if nullify=truejava.util.List<T>
removeMany(boolean nullify, int max_results)
java.util.List<T>
removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter)
<R> R
removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter, java.util.function.Supplier<R> result_creator, java.util.function.BiConsumer<R,T> accumulator)
Removes elements from the table and adds them to the result created by result_creator.void
resetStats()
protected void
resize(long seqno)
Moves rows down the matrix, by removing purged rows.Table<T>
setHighestDelivered(long seqno)
Only used internally by JGroups on a state transfer.Table<T>
setMaxCompactionTime(long max_compaction_time)
int
size()
Returns an appromximation of the number of elements in the tablejava.util.stream.Stream<T>
stream()
java.util.stream.Stream<T>
stream(long from, long to)
java.lang.String
toString()
-
-
-
Field Detail
-
num_rows
protected final int num_rows
-
elements_per_row
protected final int elements_per_row
Must be a power of 2 for efficient modular arithmetic
-
resize_factor
protected final double resize_factor
-
matrix
protected T[][] matrix
-
offset
protected long offset
The first seqno, at matrix[0][0]
-
size
protected int size
-
low
protected long low
The highest seqno purged
-
hr
protected long hr
The highest received seqno
-
hd
protected long hd
The highest delivered (= removed) seqno
-
max_compaction_time
protected long max_compaction_time
Time (in nanoseconds) after which a compaction should take place. 0 disables compaction
-
last_compaction_timestamp
protected long last_compaction_timestamp
The time when the last compaction took place. If acompact()
takes place and sees that the last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place
-
lock
protected final java.util.concurrent.locks.Lock lock
-
adders
protected final java.util.concurrent.atomic.AtomicInteger adders
-
num_compactions
protected int num_compactions
-
num_resizes
protected int num_resizes
-
num_moves
protected int num_moves
-
num_purges
protected int num_purges
-
DEFAULT_MAX_COMPACTION_TIME
protected static final long DEFAULT_MAX_COMPACTION_TIME
- See Also:
- Constant Field Values
-
DEFAULT_RESIZE_FACTOR
protected static final double DEFAULT_RESIZE_FACTOR
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
Table
public Table()
-
Table
public Table(long offset)
-
Table
public Table(int num_rows, int elements_per_row, long offset)
-
Table
public Table(int num_rows, int elements_per_row, long offset, double resize_factor)
-
Table
public Table(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time)
Creates a new table- Parameters:
num_rows
- the number of rows in the matrixelements_per_row
- the number of elements per rowoffset
- the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1resize_factor
- the factor with which to increase the number of rowsmax_compaction_time
- the max time in milliseconds after we attempt a compaction
-
-
Method Detail
-
getAdders
public java.util.concurrent.atomic.AtomicInteger getAdders()
-
getOffset
public long getOffset()
-
getElementsPerRow
public int getElementsPerRow()
-
capacity
public int capacity()
Returns the total capacity in the matrix
-
getNumCompactions
public int getNumCompactions()
-
getNumMoves
public int getNumMoves()
-
getNumResizes
public int getNumResizes()
-
getNumPurges
public int getNumPurges()
-
size
public int size()
Returns an appromximation of the number of elements in the table
-
isEmpty
public boolean isEmpty()
-
getLow
public long getLow()
-
getHighestDelivered
public long getHighestDelivered()
-
getHighestReceived
public long getHighestReceived()
-
getMaxCompactionTime
public long getMaxCompactionTime()
-
getNumRows
public int getNumRows()
-
resetStats
public void resetStats()
-
getHighestDeliverable
public long getHighestDeliverable()
Returns the highest deliverable (= removable) seqno. This may be higher thangetHighestDelivered()
, e.g. if elements have been added but not yet removed
-
getNumDeliverable
public int getNumDeliverable()
Returns the number of messages that can be delivered
-
setHighestDelivered
public Table<T> setHighestDelivered(long seqno)
Only used internally by JGroups on a state transfer. Please don't use this in application code, or you're on your own !- Parameters:
seqno
-
-
add
public boolean add(long seqno, T element)
Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.- Parameters:
seqno
-element
-- Returns:
- True if the element at the computed index was null, else false
-
add
public boolean add(long seqno, T element, java.util.function.Predicate<T> remove_filter)
Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.- Parameters:
seqno
-element
-remove_filter
- If not null, a filter used to remove all consecutive messages passing the filter- Returns:
- True if the element at the computed index was null, else false
-
add
public boolean add(java.util.List<LongTuple<T>> list)
Adds elements from list to the table- Parameters:
list
-- Returns:
- True if at least 1 element was added successfully
-
add
public boolean add(java.util.List<LongTuple<T>> list, boolean remove_added_elements)
Adds elements from list to the table, removes elements from list that were not added to the table- Parameters:
list
-- Returns:
- True if at least 1 element was added successfully. This guarantees that the list has at least 1 element
-
add
public boolean add(java.util.List<LongTuple<T>> list, boolean remove_added_elements, T const_value)
Adds elements from the list to the table- Parameters:
list
- The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could not be added to the table (e.g. because they were already present or the seqno was < HD), those elements will be removed from listremove_added_elements
- If true, elements that could not be added to the table are removed from listconst_value
- If non-null, this value should be used rather than the values of the list tuples- Returns:
- True if at least 1 element was added successfully, false otherwise.
-
add
public boolean add(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter)
-
add
public boolean add(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter, boolean remove_from_batch, T const_value)
Adds all messages from the given batch to the table- Parameters:
batch
- The batchseqno_getter
- A function to return the sequence number (seqno) of a given Message. Must be non-null. If the function return -1, then the message won't be addedremove_from_batch
- If true, the message is removedregardless
of whether it was added successfully or notconst_value
- If non-null, this value should be used rather than the values of the list tuples- Returns:
- True if at least 1 element was added successfully, false otherwise.
-
get
public T get(long seqno)
Returns an element at seqno- Parameters:
seqno
-- Returns:
-
_get
public T _get(long seqno)
To be used only for testing; doesn't do any index or sanity checks- Parameters:
seqno
-- Returns:
-
remove
public T remove()
-
remove
public T remove(boolean nullify)
Removes the next non-null element and nulls the index if nullify=true
-
removeMany
public java.util.List<T> removeMany(boolean nullify, int max_results)
-
removeMany
public java.util.List<T> removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter)
-
removeMany
public <R> R removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter, java.util.function.Supplier<R> result_creator, java.util.function.BiConsumer<R,T> accumulator)
Removes elements from the table and adds them to the result created by result_creator. Between 0 and max_results elements are removed. If no elements were removed, processing will be set to true while the table lock is held.- Type Parameters:
R
- the type of the result- Parameters:
nullify
- if true, the x,y location of the removed element in the matrix will be nulledmax_results
- the max number of results to be returned, even if more elements would be removablefilter
- a filter which accepts (or rejects) elements into the result. If null, all elements will be acceptedresult_creator
- a supplier required to create the result, e.g. ArrayList::newaccumulator
- an accumulator accepting the result and an element, e.g. ArrayList::add- Returns:
- the result
-
purge
public void purge(long seqno)
Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed- Parameters:
seqno
-
-
purge
public void purge(long seqno, boolean force)
Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed.- Parameters:
seqno
- All elements <= seqno will be nulledforce
- If true, we only ensure that seqno <= hr, but don't care about hd, and set hd=low=seqno.
-
compact
public void compact()
-
forEach
public void forEach(long from, long to, Table.Visitor<T> visitor)
Iterates over the matrix with range [from .. to] (including from and to), and callsTable.Visitor.visit(long,Object,int,int)
. If the visit() method returns false, the iteration is terminated. This method must be called with the lock held- Parameters:
from
- The starting seqnoto
- The ending seqno, the range is [from .. to] including from and tovisitor
- An instance of Visitor
-
iterator
public java.util.Iterator<T> iterator()
- Specified by:
iterator
in interfacejava.lang.Iterable<T>
-
iterator
public java.util.Iterator<T> iterator(long from, long to)
-
stream
public java.util.stream.Stream<T> stream()
-
stream
public java.util.stream.Stream<T> stream(long from, long to)
-
_add
protected boolean _add(long seqno, T element, boolean check_if_resize_needed, java.util.function.Predicate<T> remove_filter)
-
findHighestSeqno
protected static <T> long findHighestSeqno(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter)
-
resize
protected void resize(long seqno)
Moves rows down the matrix, by removing purged rows. If resizing to accommodate seqno is still needed, computes a new size. Then either moves existing rows down, or copies them into a new array (if resizing took place). The lock must be held by the caller of resize().
-
move
protected void move(int num_rows)
Moves contents of matrix num_rows down. Avoids a System.arraycopy(). Caller must hold the lock.
-
_compact
protected void _compact()
Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly. The capacity of the matrix should be size * resize_factor. Caller must hold the lock.
-
computeSize
public int computeSize()
Iterate from low to hr and add up non-null values. Caller must hold the lock.
-
getNumMissing
public int getNumMissing()
Returns the number of null elements in the range [hd+1 .. hr-1] excluding hd and hr
-
getMissing
public SeqnoList getMissing()
Returns a list of missing (= null) elements- Returns:
- A SeqnoList of missing messages, or null if no messages are missing
-
getMissing
public SeqnoList getMissing(int max_msgs)
Returns a list of missing messages- Parameters:
max_msgs
- If > 0, the max number of missing messages to be returned (oldest first), else no limit- Returns:
- A SeqnoList of missing messages, or null if no messages are missing
-
getDigest
public long[] getDigest()
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
dump
public java.lang.String dump()
Dumps the seqnos in the table as a list
-
getRow
protected T[] getRow(int index)
Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist- Parameters:
index
-- Returns:
- A row
-
computeRow
protected int computeRow(long seqno)
Computes and returns the row index for seqno. The caller must hold the lock.
-
computeIndex
protected int computeIndex(long seqno)
Computes and returns the index within a row for seqno
-
-