Class MessageSourceMultiplexer<T extends deltix.data.stream.TimeStampedMessage>

  • All Implemented Interfaces:
    deltix.data.stream.MessageSource<T>, RealTimeMessageSource<T>, deltix.util.concurrent.AbstractCursor, deltix.util.lang.Disposable, java.io.Closeable, java.lang.AutoCloseable
    Direct Known Subclasses:
    IAMessageSourceMultiplexer

    public class MessageSourceMultiplexer<T extends deltix.data.stream.TimeStampedMessage>
    extends java.lang.Object
    implements RealTimeMessageSource<T>
    Merge multiple time-sorted message streams into one. Allows dynamic addition and removal of the source feeds.
    • Field Detail

      • queue

        protected PriorityQueue<T extends deltix.data.stream.TimeStampedMessage> queue
      • currentSource

        protected deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage> currentSource
      • currentTime

        protected long currentTime
      • currentMessage

        protected T extends deltix.data.stream.TimeStampedMessage currentMessage
      • isAtBeginning

        protected boolean isAtBeginning
      • emptySources

        protected deltix.util.collections.generated.ObjectHashSet<deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>> emptySources
      • checkSources

        protected java.util.ArrayList<deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>> checkSources
      • asyncException

        protected java.lang.RuntimeException asyncException
      • realtimeMessage

        protected T extends deltix.data.stream.TimeStampedMessage realtimeMessage
      • isRealTime

        protected boolean isRealTime
      • realTimeStarted

        protected boolean realTimeStarted
      • realTimeNotification

        protected final boolean realTimeNotification
    • Constructor Detail

      • MessageSourceMultiplexer

        public MessageSourceMultiplexer()
      • MessageSourceMultiplexer

        public MessageSourceMultiplexer​(boolean ascending,
                                        boolean realTimeNotification,
                                        java.util.Comparator<T> c)
      • MessageSourceMultiplexer

        public MessageSourceMultiplexer​(boolean ascending,
                                        boolean realTimeNotification)
      • MessageSourceMultiplexer

        @SafeVarargs
        public MessageSourceMultiplexer​(deltix.data.stream.MessageSource<T>... feeds)
      • MessageSourceMultiplexer

        public MessageSourceMultiplexer​(java.util.List<deltix.data.stream.MessageSource<T>> feeds)
    • Method Detail

      • setAvailabilityListener

        public void setAvailabilityListener​(java.lang.Runnable lnr)
      • isLive

        public boolean isLive()
      • setLive

        public void setLive​(boolean live)
      • realTimeAvailable

        public boolean realTimeAvailable()
        If true, then MSM will expect real-time message (RealTimeStartMessage) from added sources and will emit RealTimeStartMessage when all sources switched to the real-time mode only. if true, all added messages sources expected to be RealTimeMessageSource having realTimeAvailable() = true
        Specified by:
        realTimeAvailable in interface RealTimeMessageSource<T extends deltix.data.stream.TimeStampedMessage>
        Returns:
        true if MSM supports real-time mode.
      • isRealTime

        public boolean isRealTime()
        Specified by:
        isRealTime in interface RealTimeMessageSource<T extends deltix.data.stream.TimeStampedMessage>
        Returns:
        true if this source already switched from historical to real-time data portion
      • isRealTimeStarted

        public boolean isRealTimeStarted()
      • createRealTimeMessage

        public T createRealTimeMessage()
      • isRealTimeMessage

        public boolean isRealTimeMessage​(T message)
      • getAllowLateOutOfOrder

        public boolean getAllowLateOutOfOrder()
      • setAllowLateOutOfOrder

        public void setAllowLateOutOfOrder​(boolean allowLateOutOfOrder)
      • closeAndRemove

        public boolean closeAndRemove​(deltix.data.stream.MessageSource<T> feed)
        Removes the specified input feed from this multiplexer. If the cursor is not at beginning, and current feed is removed, the current message becomes null. The cursor, however, is not advanced until next() is called.
        Parameters:
        feed - The feed to closeAndRemove.
        Returns:
        If current message was removed.
      • remove

        public boolean remove​(deltix.data.stream.MessageSource<T> feed)
      • add

        public void add​(deltix.data.stream.MessageSource<T> feed)
        Adds a new feed without fast-forwarding it.
        Parameters:
        feed - The new feed
      • add

        public void add​(deltix.data.stream.MessageSource<T> feed,
                        long fastForwardToTime)
        Adds a new feed and fast-forwards it (rewinds if descending order), if necessary, until the specified timestamp, unless the allowLateOutOfOrder flag is not set. If the feed implements IntermittentlyAvailableResource, an availability listener is automatically installed in it.
        Parameters:
        feed - The new feed
        fastForwardToTime - Scroll the feed to this timestamp.
      • closeFeed

        protected final void closeFeed​(deltix.data.stream.MessageSource<T> feed)
      • addEmptySource

        protected void addEmptySource​(deltix.data.stream.MessageSource<T> feed)
      • handleException

        protected final boolean handleException​(deltix.data.stream.MessageSource<T> feed,
                                                java.lang.RuntimeException rtx)
        Returns true, if feed is closed.
        Parameters:
        feed -
        rtx -
        Returns:
        true, if feed is closed
      • nextAvailable

        protected deltix.util.concurrent.NextResult nextAvailable​(RealTimeMessageSource<T> feed)
      • moveNext

        protected deltix.util.concurrent.NextResult moveNext​(deltix.data.stream.MessageSource<T> feed,
                                                             boolean addEmpty)
      • advanceRealTime

        protected final deltix.util.concurrent.NextResult advanceRealTime​(RealTimeMessageSource<T> source)
      • addSync

        protected final void addSync​(deltix.data.stream.MessageSource<T> feed,
                                     long fastForwardToTime)
      • clearSources

        public void clearSources()
      • reset

        public void reset​(deltix.data.stream.MessageSource<T>... feeds)
        Call clearSources(), and then add the specified feeds.
        Parameters:
        feeds - The feeds to add.
      • reset

        public void reset​(java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
        Call clearSources(), and then add the specified feeds.
        Parameters:
        feeds - The feeds to add.
      • reset

        public void reset​(long startTime,
                          deltix.data.stream.MessageSource<T>... feeds)
        Call clearSources(), and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at least startTime.
        Parameters:
        startTime - Fast-forward to this time.
        feeds - The feeds to add.
      • reset

        public void reset​(long startTime)
        Call clearSources(), and set fast-forward time to the specified value.
        Parameters:
        startTime - Fast-forward to this time.
      • reset

        public void reset​(long startTime,
                          java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
        Call clearSources(), and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at least startTime.
        Parameters:
        startTime - Fast-forward to this time.
        feeds - The feeds to add.
      • setException

        public void setException​(java.lang.RuntimeException x)
        Asynchronous method which will cause next () to throw the specified exception.
        Parameters:
        x -
      • next

        public boolean next()
        Specified by:
        next in interface deltix.util.concurrent.AbstractCursor
      • syncNext

        public boolean syncNext()
      • syncNext

        protected deltix.util.concurrent.NextResult syncNext​(boolean throwable)
      • processEmptyQueue

        protected final deltix.util.concurrent.NextResult processEmptyQueue​(boolean throwable)
      • sendRealTimeMessage

        protected final void sendRealTimeMessage​(java.lang.String logText)
      • isEmpty

        protected boolean isEmpty()
      • isAtEnd

        public final boolean isAtEnd()
        Specified by:
        isAtEnd in interface deltix.util.concurrent.AbstractCursor
      • getMessage

        public final T getMessage()
        Specified by:
        getMessage in interface deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>
      • getCurrentTime

        public final long getCurrentTime()
      • getCurrentSource

        public deltix.data.stream.MessageSource<T> getCurrentSource()
      • isClosed

        public boolean isClosed()
      • syncIsAtEnd

        public final boolean syncIsAtEnd()
      • syncGetMessage

        public final T syncGetMessage()
      • syncGetCurrentTime

        public final long syncGetCurrentTime()
      • syncGetCurrentSource

        public deltix.data.stream.MessageSource<T> syncGetCurrentSource()
      • syncIsClosed

        public boolean syncIsClosed()
      • close

        public void close()
        Specified by:
        close in interface java.lang.AutoCloseable
        Specified by:
        close in interface java.io.Closeable
        Specified by:
        close in interface deltix.util.lang.Disposable