Package org.jgroups.protocols.pbcast
Class StreamingStateTransfer
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.pbcast.StreamingStateTransfer
-
- All Implemented Interfaces:
ProcessingQueue.Handler<Address>
- Direct Known Subclasses:
STATE,STATE_SOCK
public abstract class StreamingStateTransfer extends Protocol implements ProcessingQueue.Handler<Address>
Base class for state transfer protocols which use streaming (or chunking) to transfer state between two members. The major advantage of this approach is that transferring application state to a joining member of a group does not entail loading of the complete application state into memory. The application state, for example, might be located entirely on some form of disk based storage. The defaultSTATE_TRANSFERprotocol requires this state to be loaded entirely into memory before being transferred to a group member while the streaming state transfer protocols do not. Thus the streaming state transfer protocols are able to transfer application state that is very large (>1Gb) without a likelihood of the such transfer resulting in OutOfMemoryException. Note that prior to 3.0, there was only 1 streaming protocol: STATE. In 3.0 the functionality was split between STATE and STATE_SOCK, and common functionality moved up into StreamingStateTransfer.- Since:
- 3.0
- Author:
- Bela Ban, Vladimir Blagojevic
- See Also:
STATE_TRANSFER,STATE,STATE_SOCK
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected classStreamingStateTransfer.StateGetterThread which invokesStateListener.getState(java.io.OutputStream)in the applicationstatic classStreamingStateTransfer.StateHeader
-
Field Summary
Fields Modifier and Type Field Description protected doubleavg_state_sizeprotected intbuffer_sizeprotected booleanflushProtocolInStackprotected Addresslocal_addrprotected intmax_poolprotected java.util.List<Address>membersprotected java.util.concurrent.atomic.LongAddernum_bytes_sentprotected java.util.concurrent.atomic.LongAddernum_state_reqsprotected longpool_thread_keep_aliveprotected Addressstate_providerprotected ProcessingQueue<Address>state_requestersList of members requesting state.protected java.util.concurrent.ThreadPoolExecutorthread_poolThread pool (configured withmax_poolandpool_thread_keep_alive) to runStreamingStateTransfer.StateGetterthreads on-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, log, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description StreamingStateTransfer()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected voidclose(java.lang.Object resource)voidcloseBarrierAndSuspendStable()protected voidcloseHoleFor(Address member)protected abstract Tuple<java.io.InputStream,java.lang.Object>createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr)Creates an InputStream to the state provider to read the state.protected voidcreateStreamToRequester(Address requester)Creates an OutputStream to the state requester to write the stateprotected java.util.concurrent.ThreadPoolExecutorcreateThreadPool()voiddestroy()This method is called on aJChannel.close().protected AddressdetermineCoordinator()java.lang.Objectdown(Event evt)An event is to be sent down the stack.doublegetAverageStateSize()longgetNumberOfStateBytesSent()longgetNumberOfStateRequests()protected voidgetStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)longgetThreadPoolCompletedTasks()intgetThreadPoolSize()voidhandle(Address state_requester)protected voidhandleConfig(java.util.Map<java.lang.String,java.lang.Object> config)protected voidhandleEOF(Address sender)protected voidhandleException(java.lang.Throwable exception)protected voidhandleStateChunk(Address sender, byte[] buffer, int offset, int length)protected voidhandleStateReq(Address requester)protected voidhandleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)protected voidhandleViewChange(View v)voidinit()Called after instance has been created (null constructor) and before protocol is started.protected booleanisDigestNeeded()When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)protected voidmodifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)protected voidopenBarrier()voidopenBarrierAndResumeStable()protected voidpunchHoleFor(Address member)java.util.List<java.lang.Integer>requiredDownServices()List of events that are required to be answered by some layer belowvoidresetStats()protected voidresumeStable()protected voidsendEof(Address requester)protected voidsendException(Address requester, java.lang.Throwable exception)protected voidsetStateInApplication(java.io.InputStream in, java.lang.Object resource, Address provider)voidstart()This method is called on aJChannel.connect(String).voidstop()This method is called on aJChannel.disconnect().java.lang.Objectup(Event evt)An event was received from the protocol below.java.lang.Objectup(Message msg)A single message was received.protected booleanuseAsyncStateDelivery()-
Methods inherited from class org.jgroups.stack.Protocol
accept, afterCreationHook, down, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, providedDownServices, providedUpServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, up
-
-
-
-
Field Detail
-
buffer_size
protected int buffer_size
-
max_pool
protected int max_pool
-
pool_thread_keep_alive
protected long pool_thread_keep_alive
-
num_state_reqs
protected final java.util.concurrent.atomic.LongAdder num_state_reqs
-
num_bytes_sent
protected final java.util.concurrent.atomic.LongAdder num_bytes_sent
-
avg_state_size
protected double avg_state_size
-
local_addr
protected Address local_addr
-
state_provider
protected volatile Address state_provider
-
members
protected final java.util.List<Address> members
-
flushProtocolInStack
protected volatile boolean flushProtocolInStack
-
thread_pool
protected java.util.concurrent.ThreadPoolExecutor thread_pool
Thread pool (configured withmax_poolandpool_thread_keep_alive) to runStreamingStateTransfer.StateGetterthreads on
-
state_requesters
protected final ProcessingQueue<Address> state_requesters
List of members requesting state. Only a single state request is handled at any time
-
-
Method Detail
-
getNumberOfStateRequests
public long getNumberOfStateRequests()
-
getNumberOfStateBytesSent
public long getNumberOfStateBytesSent()
-
getAverageStateSize
public double getAverageStateSize()
-
getThreadPoolSize
public int getThreadPoolSize()
-
getThreadPoolCompletedTasks
public long getThreadPoolCompletedTasks()
-
requiredDownServices
public java.util.List<java.lang.Integer> requiredDownServices()
Description copied from class:ProtocolList of events that are required to be answered by some layer below- Overrides:
requiredDownServicesin classProtocol
-
resetStats
public void resetStats()
- Overrides:
resetStatsin classProtocol
-
init
public void init() throws java.lang.ExceptionDescription copied from class:ProtocolCalled after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
-
destroy
public void destroy()
Description copied from class:ProtocolThis method is called on aJChannel.close(). Does some cleanup; after the call the VM will terminate
-
start
public void start() throws java.lang.ExceptionDescription copied from class:ProtocolThis method is called on aJChannel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.- Overrides:
startin classProtocol- Throws:
java.lang.Exception- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)will throw an exception
-
stop
public void stop()
Description copied from class:ProtocolThis method is called on aJChannel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed
-
down
public java.lang.Object down(Event evt)
Description copied from class:ProtocolAn event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down().
-
up
public java.lang.Object up(Event evt)
Description copied from class:ProtocolAn event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack usingdown_prot.down()or c) the event (or another event) is sent up the stack usingup_prot.up().
-
up
public java.lang.Object up(Message msg)
Description copied from class:ProtocolA single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
isDigestNeeded
protected boolean isDigestNeeded()
When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)- Returns:
- true if use of digests is required, false otherwise
-
handleConfig
protected void handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)
-
handleStateChunk
protected void handleStateChunk(Address sender, byte[] buffer, int offset, int length)
-
handleEOF
protected void handleEOF(Address sender)
-
handleException
protected void handleException(java.lang.Throwable exception)
-
getStateFromApplication
protected void getStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)
-
setStateInApplication
protected void setStateInApplication(java.io.InputStream in, java.lang.Object resource, Address provider)
-
closeBarrierAndSuspendStable
public void closeBarrierAndSuspendStable()
-
openBarrierAndResumeStable
public void openBarrierAndResumeStable()
-
openBarrier
protected void openBarrier()
-
resumeStable
protected void resumeStable()
-
sendEof
protected void sendEof(Address requester)
-
sendException
protected void sendException(Address requester, java.lang.Throwable exception)
-
createThreadPool
protected java.util.concurrent.ThreadPoolExecutor createThreadPool()
-
determineCoordinator
protected Address determineCoordinator()
-
handleViewChange
protected void handleViewChange(View v)
-
handle
public void handle(Address state_requester)
- Specified by:
handlein interfaceProcessingQueue.Handler<Address>
-
handleStateReq
protected void handleStateReq(Address requester)
-
createStreamToRequester
protected void createStreamToRequester(Address requester)
Creates an OutputStream to the state requester to write the state
-
createStreamToProvider
protected abstract Tuple<java.io.InputStream,java.lang.Object> createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr) throws java.lang.Exception
Creates an InputStream to the state provider to read the state. Return the input stream and a handback object as a tuple. The handback object is handed back to the subclass when done, or in case of an error (e.g. to clean up resources)- Throws:
java.lang.Exception
-
close
protected void close(java.lang.Object resource)
-
useAsyncStateDelivery
protected boolean useAsyncStateDelivery()
-
modifyStateResponseHeader
protected void modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
-
handleStateRsp
protected void handleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)
-
punchHoleFor
protected void punchHoleFor(Address member)
-
closeHoleFor
protected void closeHoleFor(Address member)
-
-