Class ReliableMulticast

    • Field Detail

      • use_mcast_xmit

        protected boolean use_mcast_xmit
        Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a message, the sender only retransmits once
      • use_mcast_xmit_req

        protected boolean use_mcast_xmit_req
        Use an mcast to request retransmission of missing messages. May be costly as every member will send a response
      • xmit_from_random_member

        protected boolean xmit_from_random_member
        Ask a random member for retransmission of a missing message. If true, discard_delivered_msgs is set to false
      • discard_delivered_msgs

        protected boolean discard_delivered_msgs
        Messages that have been received in order are sent up the stack (=delivered) to the application) and removed from the retransmit table, so they can get GC'ed. When this property is true, everyone (except the sender of a message) removes the message from their retransmit table as soon as it has been delivered to the application
      • log_discard_msgs

        protected boolean log_discard_msgs
        If true, logs messages discarded because received from other members
      • log_not_found_msgs

        protected boolean log_not_found_msgs
      • xmit_interval

        protected long xmit_interval
      • become_server_queue_size

        protected int become_server_queue_size
      • suppress_time_non_member_warnings

        protected long suppress_time_non_member_warnings
      • max_xmit_req_size

        protected int max_xmit_req_size
      • max_batch_size

        protected int max_batch_size
      • reuse_message_batches

        protected boolean reuse_message_batches
      • send_atomically

        protected boolean send_atomically
      • sends_can_block

        protected boolean sends_can_block
      • num_messages_sent

        protected final java.util.concurrent.atomic.LongAdder num_messages_sent
      • num_messages_received

        protected final java.util.concurrent.atomic.LongAdder num_messages_received
      • DUMMY_OOB_MSG

        protected static final Message DUMMY_OOB_MSG
      • no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs

        protected final java.util.function.Predicate<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs
      • remove_filter

        protected static final java.util.function.Predicate<Message> remove_filter
      • BATCH_ACCUMULATOR

        protected static final java.util.function.BiConsumer<MessageBatch,​Message> BATCH_ACCUMULATOR
      • SEQNO_GETTER

        protected final java.util.function.Function<Message,​java.lang.Long> SEQNO_GETTER
      • HAS_HEADER

        protected final java.util.function.Predicate<Message> HAS_HEADER
      • xmit_reqs_received

        protected final java.util.concurrent.atomic.LongAdder xmit_reqs_received
      • xmit_reqs_sent

        protected final java.util.concurrent.atomic.LongAdder xmit_reqs_sent
      • xmit_rsps_received

        protected final java.util.concurrent.atomic.LongAdder xmit_rsps_received
      • xmit_rsps_sent

        protected final java.util.concurrent.atomic.LongAdder xmit_rsps_sent
      • is_trace

        protected boolean is_trace
      • is_server

        protected volatile boolean is_server
      • members

        protected volatile java.util.List<Address> members
      • view

        protected volatile View view
      • seqno

        protected final java.util.concurrent.atomic.AtomicLong seqno
      • xmit_task

        protected java.util.concurrent.Future<?> xmit_task
        RetransmitTask running every xmit_interval ms
      • xmit_task_map

        protected final java.util.Map<Address,​java.lang.Long> xmit_task_map
        Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539)
      • stable_xmit_map

        protected final java.util.Map<Address,​java.lang.Long> stable_xmit_map
      • leaving

        protected volatile boolean leaving
      • running

        protected volatile boolean running
      • stability_msgs

        protected final BoundedList<java.lang.String> stability_msgs
        Keeps the last N stability messages
      • digest_history

        protected final BoundedList<java.lang.String> digest_history
        Keeps a bounded list of the last N digest sets
      • become_server_queue

        protected java.util.Queue<Message> become_server_queue
      • suppress_log_non_member

        protected SuppressLog<Address> suppress_log_non_member
        Log to suppress identical warnings for messages from non-members
    • Constructor Detail

      • ReliableMulticast

        public ReliableMulticast()
    • Method Detail

      • isXmitTaskRunning

        public boolean isXmitTaskRunning()
      • getNonMemberMessages

        public int getNonMemberMessages()
      • createXmitWindow

        protected abstract Buffer<Message> createXmitWindow​(long initial_seqno)
      • clearNonMemberCache

        public void clearNonMemberCache()
      • printCachedBatches

        public java.lang.String printCachedBatches()
      • getXmitRequestsReceived

        public long getXmitRequestsReceived()
      • getXmitRequestsSent

        public long getXmitRequestsSent()
      • getXmitResponsesReceived

        public long getXmitResponsesReceived()
      • getXmitResponsesSent

        public long getXmitResponsesSent()
      • useMcastXmit

        public boolean useMcastXmit()
      • useMcastXmitReq

        public boolean useMcastXmitReq()
      • xmitFromRandomMember

        public boolean xmitFromRandomMember()
      • xmitFromRandomMember

        public ReliableMulticast xmitFromRandomMember​(boolean x)
      • discardDeliveredMsgs

        public boolean discardDeliveredMsgs()
      • discardDeliveredMsgs

        public ReliableMulticast discardDeliveredMsgs​(boolean d)
      • logDiscardMessages

        public boolean logDiscardMessages()
      • logNotFoundMessages

        public boolean logNotFoundMessages()
      • logNotFoundMessages

        public ReliableMulticast logNotFoundMessages​(boolean flag)
      • setXmitFromRandomMember

        public ReliableMulticast setXmitFromRandomMember​(boolean r)
      • setDiscardDeliveredMsgs

        public ReliableMulticast setDiscardDeliveredMsgs​(boolean d)
      • getXmitInterval

        public long getXmitInterval()
      • getBecomeServerQueueSize

        public int getBecomeServerQueueSize()
      • setBecomeServerQueueSize

        public ReliableMulticast setBecomeServerQueueSize​(int b)
      • getSuppressTimeNonMemberWarnings

        public long getSuppressTimeNonMemberWarnings()
      • setSuppressTimeNonMemberWarnings

        public ReliableMulticast setSuppressTimeNonMemberWarnings​(long s)
      • getMaxXmitReqSize

        public int getMaxXmitReqSize()
      • sendsCanBlock

        public boolean sendsCanBlock()
      • getNumMessagesSent

        public long getNumMessagesSent()
      • getNumMessagesReceived

        public long getNumMessagesReceived()
      • reuseMessageBatches

        public boolean reuseMessageBatches()
      • sendAtomically

        public boolean sendAtomically()
      • isTrace

        public boolean isTrace()
      • setLevel

        public <T extends Protocol> T setLevel​(java.lang.String level)
        Description copied from class: Protocol
        Sets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
        Overrides:
        setLevel in class Protocol
        Parameters:
        level - The new level. Valid values are "fatal", "error", "warn", "info", "debug", "trace" (capitalization not relevant)
      • getBecomeServerQueueSizeActual

        public int getBecomeServerQueueSizeActual()
      • getBuf

        public <T extends Buffer<Message>> T getBuf​(Address sender)
        Returns the receive window for sender; only used for testing. Do not use !
      • setTimer

        public void setTimer​(TimeScheduler timer)
        Only used for unit tests, don't use !
      • getXmitTableUndeliveredMsgs

        public int getXmitTableUndeliveredMsgs()
      • getXmitTableMissingMessages

        public int getXmitTableMissingMessages()
      • getXmitTableCapacity

        public long getXmitTableCapacity()
      • getSizeOfAllMessages

        public long getSizeOfAllMessages()
      • getSizeOfAllMessagesInclHeaders

        public long getSizeOfAllMessagesInclHeaders()
      • printMessages

        public java.lang.String printMessages()
      • printBatches

        public java.lang.String printBatches()
      • getCurrentSeqno

        public long getCurrentSeqno()
      • printStabilityMessages

        public java.lang.String printStabilityMessages()
      • printDigestHistory

        public java.lang.String printDigestHistory()
      • init

        public void init()
                  throws java.lang.Exception
        Description copied from class: Protocol
        Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
        Specified by:
        init in interface Lifecycle
        Overrides:
        init in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
      • providedUpServices

        public java.util.List<java.lang.Integer> providedUpServices()
        Description copied from class: Protocol
        List of events that are provided to layers above (they will be handled when sent down from above)
        Overrides:
        providedUpServices in class Protocol
      • start

        public void start()
                   throws java.lang.Exception
        Description copied from class: Protocol
        This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
        Specified by:
        start in interface Lifecycle
        Overrides:
        start in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
      • down

        public java.lang.Object down​(Event evt)
        Callback. Called by superclass when event may be handled.

        Do not use down_prot.down() in this method as the event is passed down by default by the superclass after this method returns !

        Overrides:
        down in class Protocol
      • down

        public java.lang.Object down​(Message msg)
        Description copied from class: Protocol
        A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
        Overrides:
        down in class Protocol
      • up

        public java.lang.Object up​(Event evt)
        Callback. Called by superclass when event may be handled.

        Do not use passUp in this method as the event is passed up by default by the superclass after this method returns !

        Overrides:
        up in class Protocol
      • up

        public java.lang.Object up​(Message msg)
        Description copied from class: Protocol
        A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
        Overrides:
        up in class Protocol
      • up

        public void up​(MessageBatch mb)
        Description copied from class: Protocol
        Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

        The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

        Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

        Overrides:
        up in class Protocol
        Parameters:
        mb - The message batch
      • handleProbe

        public java.util.Map<java.lang.String,​java.lang.String> handleProbe​(java.lang.String... keys)
        Description copied from interface: DiagnosticsHandler.ProbeHandler
        Handles a probe. For each key that is handled, the key and its result should be in the returned map.
        Specified by:
        handleProbe in interface DiagnosticsHandler.ProbeHandler
        Returns:
        Map. A map of keys and values. A null return value is permissible.
      • queueMessage

        protected void queueMessage​(Message msg,
                                    long seqno)
      • unknownMember

        protected void unknownMember​(Address sender,
                                     java.lang.Object message)
      • addToSendBuffer

        protected boolean addToSendBuffer​(Buffer<Message> win,
                                          long seq,
                                          Message msg,
                                          java.util.function.Predicate<Message> filter,
                                          boolean dont_block)
        Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed
      • resend

        protected void resend​(Message msg)
      • handleMessage

        protected void handleMessage​(Message msg,
                                     NakAckHeader hdr)
        Finds the corresponding retransmit buffer and adds the message to it (according to seqno). Then removes as many messages as possible and passes them up the stack. Discards messages from non-members.
      • handleMessageBatch

        protected void handleMessageBatch​(MessageBatch mb)
      • removeAndDeliver

        protected void removeAndDeliver​(Buffer<Message> win,
                                        ReliableMulticast.Entry e,
                                        Address sender,
                                        boolean loopback,
                                        AsciiString cluster)
        Efficient way of checking whether another thread is already processing messages from sender. If that's the case, we return immediately and let the existing thread process our message (https://issues.redhat.com/browse/JGRP-829). Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool
      • handleXmitReq

        protected void handleXmitReq​(Address xmit_requester,
                                     SeqnoList missing_msgs,
                                     Address original_sender)
        Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.
        Parameters:
        xmit_requester - The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
        missing_msgs - A list of seqnos that have to be retransmitted
        original_sender - The member who originally sent the messsage. Guaranteed to be non-null
      • flushBecomeServerQueue

        protected void flushBecomeServerQueue()
        Flushes the queue. Done in a separate thread as we don't want to block the GMS.installView(View, Digest) method (called when a view is installed).
      • sendXmitRsp

        protected void sendXmitRsp​(Address dest,
                                   Message msg)
        Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need to preserve the original message's properties, such as src, headers etc.
        Parameters:
        dest -
        msg -
      • handleHighestSeqno

        protected void handleHighestSeqno​(Address sender,
                                          long seqno)
        Compares the sender's highest seqno with my highest seqno: if the sender's is higher, ask sender for retransmission
        Parameters:
        sender - The sender
        seqno - The highest seqno sent by sender
      • handleAck

        protected void handleAck​(Address sender,
                                 long ack)
      • isCallerRunsHandler

        protected static boolean isCallerRunsHandler​(java.util.concurrent.RejectedExecutionHandler h)
      • adjustReceivers

        protected void adjustReceivers​(java.util.List<Address> members)
        Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0). This method is not called concurrently
      • getDigest

        public Digest getDigest()
        Returns a message digest: for each member P the highest delivered and received seqno is added
      • setDigest

        protected void setDigest​(Digest digest)
        Creates a retransmit buffer for each sender in the digest according to the sender's seqno. If a buffer already exists, it resets it.
      • mergeDigest

        protected void mergeDigest​(Digest digest)
        For all members of the digest, adjust the retransmit buffers in xmit_table. If no entry exists, create one with the initial seqno set to the seqno of the member in the digest. If the member already exists, and is not the local address, replace it with the new entry (https://issues.redhat.com/browse/JGRP-699) if the digest's seqno is greater than the seqno in the window.
      • overwriteDigest

        protected void overwriteDigest​(Digest digest)
        Overwrites existing entries, but does NOT remove entries not found in the digest
      • setDigest

        protected void setDigest​(Digest digest,
                                 boolean merge)
        Sets or merges the digest. If there is no entry for a given member in xmit_table, create a new buffer. Else skip the existing entry, unless it is a merge. In this case, skip the existing entry if its seqno is greater than or equal to the one in the digest, or reset the window and create a new one if not.
        Parameters:
        digest - The digest
        merge - Whether to merge the new digest with our own, or not
      • stable

        protected void stable​(Digest digest)
        Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update xmit_table: for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the retransmit buffer corresponding to P which are <= seqno at digest[P].
      • retransmit

        protected void retransmit​(long first_seqno,
                                  long last_seqno,
                                  Address sender,
                                  boolean multicast_xmit_request)
      • retransmit

        protected void retransmit​(SeqnoList missing_msgs,
                                  Address sender,
                                  boolean multicast_xmit_request)
      • reset

        protected void reset()
      • sizeOfAllMessages

        protected static long sizeOfAllMessages​(Buffer<Message> win,
                                                boolean include_headers)
      • startRetransmitTask

        protected void startRetransmitTask()
      • stopRetransmitTask

        protected void stopRetransmitTask()
      • triggerXmit

        public void triggerXmit()