Package org.jgroups.blocks
Class RequestCorrelator
- java.lang.Object
-
- org.jgroups.blocks.RequestCorrelator
-
public class RequestCorrelator extends java.lang.ObjectFramework to send requests and receive matching responses (on request ID). Multiple requests can be sent at a time. Whenever a response is received, the correctRequestis looked up (key = id) and its methodreceiveResponse()invoked.- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classRequestCorrelator.HeaderThe header for RequestCorrelator messagesstatic classRequestCorrelator.MultiDestinationHeaderprotected classRequestCorrelator.MyProbeHandlerprotected classRequestCorrelator.ResponseImpl
-
Field Summary
Fields Modifier and Type Field Description protected booleanasync_dispatchingWhether or not to use async dispatcherprotected booleanasync_rsp_handlingWhen enabled, responses are handled by the common ForkJoinPool (https://issues.redhat.com/browse/JGRP-2644)protected AverageMinMaxavg_req_deliveryprotected AverageMinMaxavg_rsp_deliveryprotected java.util.concurrent.ForkJoinPoolcommon_poolprotected shortcorr_idmakes the instance unique (together with IDs)protected Protocoldown_protThe protocol layer to use to pass up/down messages.protected Addresslocal_addrThe address of this group memberprotected static Loglogprotected RequestCorrelator.MyProbeHandlerprobe_handlerprotected RequestHandlerrequest_handlerThe handler for the incoming requests.protected static java.util.concurrent.atomic.AtomicLongREQUEST_IDTo generate unique request IDsprotected java.util.Map<java.lang.Long,Request<?>>requestsThe table of pending requests (keys=Long (request IDs), values=RequestEntry)protected RpcStatsrpc_statsprotected booleanrpcstatsprotected booleanstartedprotected Viewviewprotected booleanwrap_exceptions
-
Constructor Summary
Constructors Constructor Description RequestCorrelator(Protocol down_prot, RequestHandler handler, Address local_addr)Constructor.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected <T> voidaddEntry(Request<T> req, RequestCorrelator.Header hdr, boolean unicast)booleanasyncDispatching()RequestCorrelatorasyncDispatching(boolean flag)booleanasyncRspHandling()RequestCorrelatorasyncRspHandling(boolean f)protected voiddispatch(Message msg, RequestCorrelator.Header hdr)voiddone(long id)Used to signal that a certain request may be garbage collected as all responses have been receivedAddressgetLocalAddress()protected voidhandleRequest(Message req, RequestCorrelator.Header hdr)Handle a request msg for this correlatorprotected voidhandleResponse(Message rsp, RequestCorrelator.Header hdr)protected voiditerate(MessageBatch batch, boolean skip_excluded_msgs, boolean process_reqs, boolean process_rsps)protected static MessagemakeReply(Message msg)booleanreceive(Event evt)Callback.booleanreceiveMessage(Message msg)Handles a message coming from a layer belowvoidreceiveMessageBatch(MessageBatch batch)voidreceiveView(View new_view)View received: mark all responses from members that are not in new_view as suspectedvoidregisterProbeHandler(TP transport)protected RequestCorrelatorremoveEntry(long req_id)booleanrpcStats()RequestCorrelatorrpcStats(boolean b)protected voidsendAnycastRequest(Message req, java.util.Collection<Address> dest_mbrs)<T> voidsendMulticastRequest(java.util.Collection<Address> dest_mbrs, Message msg, Request<T> req, RequestOptions opts)Sends a request to a group.protected voidsendReply(Message req, long req_id, java.lang.Object reply, boolean is_exception)protected voidsendResponse(Message rsp, long req_id, boolean is_exception)<T> voidsendUnicastRequest(Message msg, Request<T> req, RequestOptions opts)Sends a request to a single destinationRequestCorrelatorsetLocalAddress(Address a)voidsetMemberUnreachable(Address mbr)voidsetRequestHandler(RequestHandler handler)voidsetSiteUnreachable(java.lang.String site)An entire site is down; mark all requests that point to that site as unreachable (used by RELAY2)protected booleanskip(RequestCorrelator.Header hdr, Address sender)voidstart()voidstop()voidunregisterProbeHandler(TP transport)booleanwrapExceptions()RequestCorrelatorwrapExceptions(boolean flag)
-
-
-
Field Detail
-
down_prot
protected Protocol down_prot
The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport
-
requests
protected final java.util.Map<java.lang.Long,Request<?>> requests
The table of pending requests (keys=Long (request IDs), values=RequestEntry)
-
REQUEST_ID
protected static final java.util.concurrent.atomic.AtomicLong REQUEST_ID
To generate unique request IDs
-
request_handler
protected RequestHandler request_handler
The handler for the incoming requests. It is called from inside the dispatcher thread
-
corr_id
protected short corr_id
makes the instance unique (together with IDs)
-
local_addr
protected Address local_addr
The address of this group member
-
view
protected volatile View view
-
started
protected boolean started
-
async_dispatching
protected boolean async_dispatching
Whether or not to use async dispatcher
-
wrap_exceptions
protected boolean wrap_exceptions
-
async_rsp_handling
protected boolean async_rsp_handling
When enabled, responses are handled by the common ForkJoinPool (https://issues.redhat.com/browse/JGRP-2644)
-
probe_handler
protected final RequestCorrelator.MyProbeHandler probe_handler
-
common_pool
protected final java.util.concurrent.ForkJoinPool common_pool
-
rpcstats
protected volatile boolean rpcstats
-
rpc_stats
protected final RpcStats rpc_stats
-
avg_req_delivery
protected final AverageMinMax avg_req_delivery
-
avg_rsp_delivery
protected final AverageMinMax avg_rsp_delivery
-
log
protected static final Log log
-
-
Constructor Detail
-
RequestCorrelator
public RequestCorrelator(Protocol down_prot, RequestHandler handler, Address local_addr)
Constructor. Uses DOWN_PROT to send messages. Ifhandleris not null, all incoming requests will be dispatched to it (viahandleRequest(Message, Header).- Parameters:
down_prot- Used to send requests or responses.handler- Request handler. Methodhandle(Message)will be called when a request is received.local_addr- The address of this member
-
-
Method Detail
-
setRequestHandler
public void setRequestHandler(RequestHandler handler)
-
getLocalAddress
public Address getLocalAddress()
-
setLocalAddress
public RequestCorrelator setLocalAddress(Address a)
-
asyncDispatching
public boolean asyncDispatching()
-
asyncDispatching
public RequestCorrelator asyncDispatching(boolean flag)
-
asyncRspHandling
public boolean asyncRspHandling()
-
asyncRspHandling
public RequestCorrelator asyncRspHandling(boolean f)
-
wrapExceptions
public boolean wrapExceptions()
-
wrapExceptions
public RequestCorrelator wrapExceptions(boolean flag)
-
rpcStats
public boolean rpcStats()
-
rpcStats
public RequestCorrelator rpcStats(boolean b)
-
sendMulticastRequest
public <T> void sendMulticastRequest(java.util.Collection<Address> dest_mbrs, Message msg, Request<T> req, RequestOptions opts) throws java.lang.Exception
Sends a request to a group. If no response collector is given, no responses are expected (making the call asynchronous)- Parameters:
dest_mbrs- The list of members who should receive the call. Usually a group RPC is sent via multicast, but a receiver drops the request if its own address is not in this list. Will not be used if it is null.msg- The message to be sent.req- A request (usually the object that invokes this method). Its methodsreceiveResponse()andsuspect()will be invoked when a message has been received or a member is suspected.- Throws:
java.lang.Exception
-
sendUnicastRequest
public <T> void sendUnicastRequest(Message msg, Request<T> req, RequestOptions opts) throws java.lang.Exception
Sends a request to a single destination- Throws:
java.lang.Exception
-
done
public void done(long id)
Used to signal that a certain request may be garbage collected as all responses have been received
-
receive
public boolean receive(Event evt)
Callback.Called by the protocol below when a message has been received. The algorithm should test whether the message is destined for us and, if not, pass it up to the next layer. Otherwise, it should remove the header and check whether the message is a request or response. In the first case, the message will be delivered to the request handler registered (calling its
handle()method), in the second case, the corresponding response collector is looked up and the message delivered.- Parameters:
evt- The event to be received- Returns:
- Whether or not the event was consumed. If true, don't pass message up, else pass it up
-
start
public final void start()
-
stop
public void stop()
-
registerProbeHandler
public void registerProbeHandler(TP transport)
-
unregisterProbeHandler
public void unregisterProbeHandler(TP transport)
-
setSiteUnreachable
public void setSiteUnreachable(java.lang.String site)
An entire site is down; mark all requests that point to that site as unreachable (used by RELAY2)
-
setMemberUnreachable
public void setMemberUnreachable(Address mbr)
-
receiveView
public void receiveView(View new_view)
View received: mark all responses from members that are not in new_view as suspected
-
receiveMessage
public boolean receiveMessage(Message msg)
Handles a message coming from a layer below- Returns:
- true if the message was consumed, don't pass it further up, else false
-
receiveMessageBatch
public void receiveMessageBatch(MessageBatch batch)
-
iterate
protected void iterate(MessageBatch batch, boolean skip_excluded_msgs, boolean process_reqs, boolean process_rsps)
-
skip
protected boolean skip(RequestCorrelator.Header hdr, Address sender)
-
sendAnycastRequest
protected void sendAnycastRequest(Message req, java.util.Collection<Address> dest_mbrs)
-
addEntry
protected <T> void addEntry(Request<T> req, RequestCorrelator.Header hdr, boolean unicast)
-
removeEntry
protected RequestCorrelator removeEntry(long req_id)
-
dispatch
protected void dispatch(Message msg, RequestCorrelator.Header hdr)
-
handleRequest
protected void handleRequest(Message req, RequestCorrelator.Header hdr)
Handle a request msg for this correlator
-
handleResponse
protected void handleResponse(Message rsp, RequestCorrelator.Header hdr)
-
sendReply
protected void sendReply(Message req, long req_id, java.lang.Object reply, boolean is_exception)
-
sendResponse
protected void sendResponse(Message rsp, long req_id, boolean is_exception)
-
-