Package deltix.data.stream
Class MessageSourceMultiplexer<T extends deltix.data.stream.TimeStampedMessage>
- java.lang.Object
-
- deltix.data.stream.MessageSourceMultiplexer<T>
-
- All Implemented Interfaces:
deltix.data.stream.MessageSource<T>,RealTimeMessageSource<T>,deltix.util.concurrent.AbstractCursor,deltix.util.lang.Disposable,java.io.Closeable,java.lang.AutoCloseable
- Direct Known Subclasses:
IAMessageSourceMultiplexer
public class MessageSourceMultiplexer<T extends deltix.data.stream.TimeStampedMessage> extends java.lang.Object implements RealTimeMessageSource<T>
Merge multiple time-sorted message streams into one. Allows dynamic addition and removal of the source feeds.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interfaceMessageSourceMultiplexer.ExceptionHandler
-
Field Summary
Fields Modifier and Type Field Description protected java.lang.RuntimeExceptionasyncExceptionprotected java.util.ArrayList<deltix.data.stream.MessageSource<T>>checkSourcesprotected TcurrentMessageprotected deltix.data.stream.MessageSource<T>currentSourceprotected longcurrentTimeprotected deltix.util.collections.generated.ObjectHashSet<deltix.data.stream.MessageSource<T>>emptySourcesprotected booleanisAtBeginningprotected booleanisRealTimeprotected PriorityQueue<T>queueprotected TrealtimeMessageprotected booleanrealTimeNotificationprotected booleanrealTimeStarted
-
Constructor Summary
Constructors Constructor Description MessageSourceMultiplexer()MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification)MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification, java.util.Comparator<T> c)MessageSourceMultiplexer(deltix.data.stream.MessageSource<T>... feeds)MessageSourceMultiplexer(java.util.List<deltix.data.stream.MessageSource<T>> feeds)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidadd(deltix.data.stream.MessageSource<T> feed)Adds a new feed without fast-forwarding it.voidadd(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)Adds a new feed and fast-forwards it (rewinds if descending order), if necessary, until the specified timestamp, unless the allowLateOutOfOrder flag is not set.protected voidaddEmptySource(deltix.data.stream.MessageSource<T> feed)protected voidaddSync(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)protected deltix.util.concurrent.NextResultadvanceRealTime(RealTimeMessageSource<T> source)voidclearSources()voidclose()booleancloseAndRemove(deltix.data.stream.MessageSource<T> feed)Removes the specified input feed from this multiplexer.protected voidcloseFeed(deltix.data.stream.MessageSource<T> feed)TcreateRealTimeMessage()booleangetAllowLateOutOfOrder()deltix.data.stream.MessageSource<T>getCurrentSource()longgetCurrentTime()MessageSourceMultiplexer.ExceptionHandlergetExceptionHandler()TgetMessage()protected booleanhandleException(deltix.data.stream.MessageSource<T> feed, java.lang.RuntimeException rtx)Returns true, if feed is closed.voidinvalidateRealTimeState(RealTimeMessageSource<T> source)booleanisAtEnd()booleanisClosed()protected booleanisEmpty()booleanisLive()booleanisRealTime()booleanisRealTimeMessage(T message)booleanisRealTimeStarted()protected deltix.util.concurrent.NextResultmoveNext(deltix.data.stream.MessageSource<T> feed, boolean addEmpty)booleannext()protected deltix.util.concurrent.NextResultnextAvailable(RealTimeMessageSource<T> feed)protected deltix.util.concurrent.NextResultprocessEmptyQueue(boolean throwable)booleanrealTimeAvailable()If true, then MSM will expect real-time message (RealTimeStartMessage) from added sources and will emit RealTimeStartMessage when all sources switched to the real-time mode only.booleanremove(deltix.data.stream.MessageSource<T> feed)voidreset(long startTime)CallclearSources(), and set fast-forward time to the specified value.voidreset(long startTime, deltix.data.stream.MessageSource<T>... feeds)CallclearSources(), and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime.voidreset(long startTime, java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)CallclearSources(), and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime.voidreset(deltix.data.stream.MessageSource<T>... feeds)CallclearSources(), and then add the specified feeds.voidreset(java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)CallclearSources(), and then add the specified feeds.protected voidsendRealTimeMessage(java.lang.String logText)voidsetAllowLateOutOfOrder(boolean allowLateOutOfOrder)voidsetAvailabilityListener(java.lang.Runnable lnr)voidsetException(java.lang.RuntimeException x)Asynchronous method which will cause next () to throw the specified exception.voidsetExceptionHandler(MessageSourceMultiplexer.ExceptionHandler handler)voidsetLive(boolean live)deltix.data.stream.MessageSource<T>syncGetCurrentSource()longsyncGetCurrentTime()TsyncGetMessage()booleansyncIsAtEnd()booleansyncIsClosed()booleansyncNext()protected deltix.util.concurrent.NextResultsyncNext(boolean throwable)
-
-
-
Field Detail
-
queue
protected PriorityQueue<T extends deltix.data.stream.TimeStampedMessage> queue
-
currentSource
protected deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage> currentSource
-
currentTime
protected long currentTime
-
currentMessage
protected T extends deltix.data.stream.TimeStampedMessage currentMessage
-
isAtBeginning
protected boolean isAtBeginning
-
emptySources
protected deltix.util.collections.generated.ObjectHashSet<deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>> emptySources
-
checkSources
protected java.util.ArrayList<deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>> checkSources
-
asyncException
protected java.lang.RuntimeException asyncException
-
realtimeMessage
protected T extends deltix.data.stream.TimeStampedMessage realtimeMessage
-
isRealTime
protected boolean isRealTime
-
realTimeStarted
protected boolean realTimeStarted
-
realTimeNotification
protected final boolean realTimeNotification
-
-
Constructor Detail
-
MessageSourceMultiplexer
public MessageSourceMultiplexer()
-
MessageSourceMultiplexer
public MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification, java.util.Comparator<T> c)
-
MessageSourceMultiplexer
public MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification)
-
MessageSourceMultiplexer
@SafeVarargs public MessageSourceMultiplexer(deltix.data.stream.MessageSource<T>... feeds)
-
MessageSourceMultiplexer
public MessageSourceMultiplexer(java.util.List<deltix.data.stream.MessageSource<T>> feeds)
-
-
Method Detail
-
setAvailabilityListener
public void setAvailabilityListener(java.lang.Runnable lnr)
-
getExceptionHandler
public MessageSourceMultiplexer.ExceptionHandler getExceptionHandler()
-
setExceptionHandler
public void setExceptionHandler(MessageSourceMultiplexer.ExceptionHandler handler)
-
isLive
public boolean isLive()
-
setLive
public void setLive(boolean live)
-
realTimeAvailable
public boolean realTimeAvailable()
If true, then MSM will expect real-time message (RealTimeStartMessage) from added sources and will emit RealTimeStartMessage when all sources switched to the real-time mode only. if true, all added messages sources expected to beRealTimeMessageSourcehaving realTimeAvailable() = true- Specified by:
realTimeAvailablein interfaceRealTimeMessageSource<T extends deltix.data.stream.TimeStampedMessage>- Returns:
- true if MSM supports real-time mode.
-
isRealTime
public boolean isRealTime()
- Specified by:
isRealTimein interfaceRealTimeMessageSource<T extends deltix.data.stream.TimeStampedMessage>- Returns:
- true if this source already switched from historical to real-time data portion
-
isRealTimeStarted
public boolean isRealTimeStarted()
-
createRealTimeMessage
public T createRealTimeMessage()
-
isRealTimeMessage
public boolean isRealTimeMessage(T message)
-
getAllowLateOutOfOrder
public boolean getAllowLateOutOfOrder()
-
setAllowLateOutOfOrder
public void setAllowLateOutOfOrder(boolean allowLateOutOfOrder)
-
closeAndRemove
public boolean closeAndRemove(deltix.data.stream.MessageSource<T> feed)
Removes the specified input feed from this multiplexer. If the cursor is not at beginning, and current feed is removed, the current message becomesnull. The cursor, however, is not advanced untilnext()is called.- Parameters:
feed- The feed to closeAndRemove.- Returns:
- If current message was removed.
-
remove
public boolean remove(deltix.data.stream.MessageSource<T> feed)
-
add
public void add(deltix.data.stream.MessageSource<T> feed)
Adds a new feed without fast-forwarding it.- Parameters:
feed- The new feed
-
add
public void add(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)
Adds a new feed and fast-forwards it (rewinds if descending order), if necessary, until the specified timestamp, unless the allowLateOutOfOrder flag is not set. If the feed implementsIntermittentlyAvailableResource, an availability listener is automatically installed in it.- Parameters:
feed- The new feedfastForwardToTime- Scroll the feed to this timestamp.
-
closeFeed
protected final void closeFeed(deltix.data.stream.MessageSource<T> feed)
-
addEmptySource
protected void addEmptySource(deltix.data.stream.MessageSource<T> feed)
-
handleException
protected final boolean handleException(deltix.data.stream.MessageSource<T> feed, java.lang.RuntimeException rtx)
Returns true, if feed is closed.- Parameters:
feed-rtx-- Returns:
- true, if feed is closed
-
nextAvailable
protected deltix.util.concurrent.NextResult nextAvailable(RealTimeMessageSource<T> feed)
-
moveNext
protected deltix.util.concurrent.NextResult moveNext(deltix.data.stream.MessageSource<T> feed, boolean addEmpty)
-
advanceRealTime
protected final deltix.util.concurrent.NextResult advanceRealTime(RealTimeMessageSource<T> source)
-
addSync
protected final void addSync(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)
-
clearSources
public void clearSources()
-
reset
public void reset(deltix.data.stream.MessageSource<T>... feeds)
CallclearSources(), and then add the specified feeds.- Parameters:
feeds- The feeds to add.
-
reset
public void reset(java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
CallclearSources(), and then add the specified feeds.- Parameters:
feeds- The feeds to add.
-
reset
public void reset(long startTime, deltix.data.stream.MessageSource<T>... feeds)CallclearSources(), and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime.- Parameters:
startTime- Fast-forward to this time.feeds- The feeds to add.
-
reset
public void reset(long startTime)
CallclearSources(), and set fast-forward time to the specified value.- Parameters:
startTime- Fast-forward to this time.
-
reset
public void reset(long startTime, java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)CallclearSources(), and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime.- Parameters:
startTime- Fast-forward to this time.feeds- The feeds to add.
-
invalidateRealTimeState
public void invalidateRealTimeState(RealTimeMessageSource<T> source)
-
setException
public void setException(java.lang.RuntimeException x)
Asynchronous method which will cause next () to throw the specified exception.- Parameters:
x-
-
next
public boolean next()
- Specified by:
nextin interfacedeltix.util.concurrent.AbstractCursor
-
syncNext
public boolean syncNext()
-
syncNext
protected deltix.util.concurrent.NextResult syncNext(boolean throwable)
-
processEmptyQueue
protected final deltix.util.concurrent.NextResult processEmptyQueue(boolean throwable)
-
sendRealTimeMessage
protected final void sendRealTimeMessage(java.lang.String logText)
-
isEmpty
protected boolean isEmpty()
-
isAtEnd
public final boolean isAtEnd()
- Specified by:
isAtEndin interfacedeltix.util.concurrent.AbstractCursor
-
getMessage
public final T getMessage()
- Specified by:
getMessagein interfacedeltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>
-
getCurrentTime
public final long getCurrentTime()
-
getCurrentSource
public deltix.data.stream.MessageSource<T> getCurrentSource()
-
isClosed
public boolean isClosed()
-
syncIsAtEnd
public final boolean syncIsAtEnd()
-
syncGetMessage
public final T syncGetMessage()
-
syncGetCurrentTime
public final long syncGetCurrentTime()
-
syncGetCurrentSource
public deltix.data.stream.MessageSource<T> syncGetCurrentSource()
-
syncIsClosed
public boolean syncIsClosed()
-
close
public void close()
- Specified by:
closein interfacejava.lang.AutoCloseable- Specified by:
closein interfacejava.io.Closeable- Specified by:
closein interfacedeltix.util.lang.Disposable
-
-