Package org.jgroups.protocols.pbcast
Class STABLE
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.pbcast.STABLE
-
public class STABLE extends Protocol
Computes the broadcast messages that are stable; i.e., have been delivered by all members. Sends STABLE events down the stack when this is the case. This allows NAKACK{2,3} to garbage collect messages that have been seen by all members.Works as follows: periodically (desired_avg_gossip) or when having received a number of bytes (max_bytes), every member sends its digest (highest seqno delivered, received) to the current coordinator
The coordinator updates a stability vector, which maintains the highest seqno delivered/receive for each member and initially contains no data, when such a message is received.
When messages from all members have been received, a stability message is mcast, which causes all members to send a STABLE event down the stack (triggering garbage collection in the NAKACK{2,3} layer).- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected classSTABLE.ResumeTaskstatic classSTABLE.StableHeaderprotected classSTABLE.StableTaskMcast periodic STABLE message.
-
Field Summary
Fields Modifier and Type Field Description protected Addresscoordinatorprotected longdesired_avg_gossipSends a STABLE gossip every 20 seconds on average.protected MutableDigestdigestprotected booleaninitializedprotected Addresslocal_addrprotected java.util.concurrent.locks.Locklockprotected longmax_bytesTotal amount of bytes from incoming messages (default = 0 = disabled).protected static longMAX_SUSPEND_TIMEprotected longnum_bytes_receivedThe total number of bytes received from unicast and multicast messagesprotected intnum_stability_msgs_receivedprotected intnum_stability_msgs_sentprotected intnum_stable_msgs_receivedprotected intnum_stable_msgs_sentprotected java.util.concurrent.locks.Lockreceivedprotected java.util.concurrent.Future<?>resume_task_futureprotected java.lang.Objectresume_task_mutexprotected booleansend_stable_msgs_to_coord_onlyDeprecated.protected longstability_delayDeprecated.protected java.util.concurrent.Future<?>stable_task_futureprotected java.util.concurrent.locks.Lockstable_task_lockprotected booleansuspendedWhen true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messagesprotected TimeSchedulertimerprotected Viewviewprotected FixedSizeBitSetvotesKeeps track of who we already heard from (STABLE_GOSSIP msgs).-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, log, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description STABLE()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected booleanaddVote(int rank)Adds mbr to votes and returns true if we have all the votes, otherwise false.protected static booleanallVotesReceived(FixedSizeBitSet votes)Votes is already locked and guaranteed to be non-nullSTABLEdesiredAverageGossip(long g)java.lang.Objectdown(Event evt)An event is to be sent down the stack.java.lang.Objectdown(Message msg)A message is sent down the stack.voidgc()longgetBytes()longgetDesiredAverageGossip()protected DigestgetDigest()longgetMaxBytes()intgetNumVotes()protected static intgetRank(Address member, View v)intgetStabilityReceived()intgetStabilitySent()intgetStableReceived()intgetStableSent()booleangetStableTaskRunning()protected java.lang.Objecthandle(STABLE.StableHeader hdr, Address sender, Digest digest)protected voidhandleRegularMessage(Message msg)protected voidhandleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id)protected voidhandleStableMessage(Digest d, Address sender, ViewId view_id)Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos seen for each member.protected voidhandleUpEvent(STABLE.StableHeader hdr, Address sender, Digest digest)protected voidhandleViewChange(View v)voidinit()Called after instance has been created (null constructor) and before protocol is started.static Buffermarshal(Digest digest)protected booleanmaxBytesExceeded(int len)java.lang.StringprintDigest()protected java.lang.StringprintDigest(Digest digest)java.lang.StringprintVotes()protected DigestreadDigest(byte[] buffer, int offset, int length)java.util.List<java.lang.Integer>requiredDownServices()List of events that are required to be answered by some layer belowprotected voidresetDigest()protected voidresetNumBytes()voidresetStats()protected voidresume()protected voidsendStabilityMessage(Digest d, ViewId view_id)Sends a stability message to all members except self.protected voidsendStableMessage(boolean send_in_background)Broadcasts a STABLE message of the current digest to all members (or the coordinator only).voidsetDesiredAverageGossip(long gossip_interval)voidsetMaxBytes(long max_bytes)voidstart()This method is called on aJChannel.connect(String).protected voidstartResumeTask(long max_suspend_time)protected voidstartStableTask()voidstop()This method is called on aJChannel.disconnect().protected voidstopResumeTask()protected voidstopStableTask()protected voidsuspend(long timeout)java.lang.Objectup(Event evt)An event was received from the protocol below.java.lang.Objectup(Message msg)A single message was received.voidup(MessageBatch batch)Sends up a multiple messages in aMessageBatch.protected voidupdateLocalDigest(Digest d, Address sender)Update my own digest from a digest received by somebody else.-
Methods inherited from class org.jgroups.stack.Protocol
accept, afterCreationHook, destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, providedDownServices, providedUpServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled
-
-
-
-
Field Detail
-
MAX_SUSPEND_TIME
protected static final long MAX_SUSPEND_TIME
- See Also:
- Constant Field Values
-
desired_avg_gossip
protected long desired_avg_gossip
Sends a STABLE gossip every 20 seconds on average. 0 disables gossiping of STABLE messages
-
stability_delay
@Deprecated protected long stability_delay
Deprecated.delay before we send STABILITY msg (give others a change to send first). This should be set to a very small number (> 0 !) ifmax_bytesis used
-
max_bytes
protected long max_bytes
Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE message will be broadcast andnum_bytes_receivedreset to 0 . If this is > 0, then ideallystability_delayshould be set to a low number as well
-
send_stable_msgs_to_coord_only
@Deprecated protected boolean send_stable_msgs_to_coord_only
Deprecated.
-
num_stable_msgs_sent
protected int num_stable_msgs_sent
-
num_stable_msgs_received
protected int num_stable_msgs_received
-
num_stability_msgs_sent
protected int num_stability_msgs_sent
-
num_stability_msgs_received
protected int num_stability_msgs_received
-
local_addr
protected Address local_addr
-
view
protected volatile View view
-
digest
protected volatile MutableDigest digest
-
votes
protected FixedSizeBitSet votes
Keeps track of who we already heard from (STABLE_GOSSIP msgs). This is all 0's, and we set the sender when a STABLE message is received. When the bitset is all 1's (responses from all members), we send a STABILITY message
-
lock
protected final java.util.concurrent.locks.Lock lock
-
stable_task_future
protected java.util.concurrent.Future<?> stable_task_future
-
stable_task_lock
protected final java.util.concurrent.locks.Lock stable_task_lock
-
timer
protected TimeScheduler timer
-
num_bytes_received
protected long num_bytes_received
The total number of bytes received from unicast and multicast messages
-
received
protected final java.util.concurrent.locks.Lock received
-
suspended
protected volatile boolean suspended
When true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messages
-
initialized
protected boolean initialized
-
resume_task_future
protected java.util.concurrent.Future<?> resume_task_future
-
resume_task_mutex
protected final java.lang.Object resume_task_mutex
-
coordinator
protected volatile Address coordinator
-
-
Method Detail
-
getDesiredAverageGossip
public long getDesiredAverageGossip()
-
setDesiredAverageGossip
public void setDesiredAverageGossip(long gossip_interval)
-
desiredAverageGossip
public STABLE desiredAverageGossip(long g)
-
getMaxBytes
public long getMaxBytes()
-
setMaxBytes
public void setMaxBytes(long max_bytes)
-
getBytes
public long getBytes()
-
getStableSent
public int getStableSent()
-
getStableReceived
public int getStableReceived()
-
getStabilitySent
public int getStabilitySent()
-
getStabilityReceived
public int getStabilityReceived()
-
getNumVotes
public int getNumVotes()
-
getStableTaskRunning
public boolean getStableTaskRunning()
-
gc
public void gc()
-
printDigest
public java.lang.String printDigest()
-
printVotes
public java.lang.String printVotes()
-
resetStats
public void resetStats()
- Overrides:
resetStatsin classProtocol
-
requiredDownServices
public java.util.List<java.lang.Integer> requiredDownServices()
Description copied from class:ProtocolList of events that are required to be answered by some layer below- Overrides:
requiredDownServicesin classProtocol
-
suspend
protected void suspend(long timeout)
-
resume
protected void resume()
-
init
public void init() throws java.lang.ExceptionDescription copied from class:ProtocolCalled after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
-
start
public void start() throws java.lang.ExceptionDescription copied from class:ProtocolThis method is called on aJChannel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.- Overrides:
startin classProtocol- Throws:
java.lang.Exception- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)will throw an exception
-
stop
public void stop()
Description copied from class:ProtocolThis method is called on aJChannel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed
-
up
public java.lang.Object up(Event evt)
Description copied from class:ProtocolAn event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack usingdown_prot.down()or c) the event (or another event) is sent up the stack usingup_prot.up().
-
up
public java.lang.Object up(Message msg)
Description copied from class:ProtocolA single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
handleUpEvent
protected void handleUpEvent(STABLE.StableHeader hdr, Address sender, Digest digest)
-
up
public void up(MessageBatch batch)
Description copied from class:ProtocolSends up a multiple messages in aMessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages. The default processing below sends messages up the stack individually, based on a matching criteria (callingProtocol.accept(org.jgroups.Message)), and - if true - callsProtocol.up(org.jgroups.Event)for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped. Subclasses should check if there are any messages destined for them (e.g. usingMessageBatch.getMatchingMessages(short,boolean)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.
-
down
public java.lang.Object down(Message msg)
Description copied from class:ProtocolA message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it before passing it down.
-
down
public java.lang.Object down(Event evt)
Description copied from class:ProtocolAn event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down().
-
handle
protected java.lang.Object handle(STABLE.StableHeader hdr, Address sender, Digest digest)
-
handleRegularMessage
protected void handleRegularMessage(Message msg)
-
maxBytesExceeded
protected boolean maxBytesExceeded(int len)
-
handleViewChange
protected void handleViewChange(View v)
-
updateLocalDigest
protected void updateLocalDigest(Digest d, Address sender)
Update my own digest from a digest received by somebody else. Returns whether the update was successful. Needs to be called with a lock on digest
-
resetDigest
protected void resetDigest()
-
addVote
protected boolean addVote(int rank)
Adds mbr to votes and returns true if we have all the votes, otherwise false.- Parameters:
rank-
-
allVotesReceived
protected static boolean allVotesReceived(FixedSizeBitSet votes)
Votes is already locked and guaranteed to be non-null
-
startStableTask
protected void startStableTask()
-
stopStableTask
protected void stopStableTask()
-
startResumeTask
protected void startResumeTask(long max_suspend_time)
-
stopResumeTask
protected void stopResumeTask()
-
handleStableMessage
protected void handleStableMessage(Digest d, Address sender, ViewId view_id)
Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos seen for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability message, which results in garbage collection of messages lower than the ones in the stability vector. The maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN for details).
-
resetNumBytes
protected void resetNumBytes()
-
handleStabilityMessage
protected void handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id)
-
sendStableMessage
protected void sendStableMessage(boolean send_in_background)
Broadcasts a STABLE message of the current digest to all members (or the coordinator only). The message contains the highest seqno delivered and received for all members. The seqnos are retrieved from the NAKACK layer below.
-
readDigest
protected Digest readDigest(byte[] buffer, int offset, int length)
-
sendStabilityMessage
protected void sendStabilityMessage(Digest d, ViewId view_id)
Sends a stability message to all members except self.- Parameters:
d- A copy of the stability digest, so we don't need to copy it again
-
getDigest
protected Digest getDigest()
-
printDigest
protected java.lang.String printDigest(Digest digest)
-
-