Package deltix.data.stream
Class MessageSourceMultiplexer<T extends deltix.data.stream.TimeStampedMessage>
- java.lang.Object
-
- deltix.data.stream.MessageSourceMultiplexer<T>
-
- All Implemented Interfaces:
deltix.data.stream.MessageSource<T>
,RealTimeMessageSource<T>
,deltix.util.concurrent.AbstractCursor
,deltix.util.lang.Disposable
,java.io.Closeable
,java.lang.AutoCloseable
- Direct Known Subclasses:
IAMessageSourceMultiplexer
public class MessageSourceMultiplexer<T extends deltix.data.stream.TimeStampedMessage> extends java.lang.Object implements RealTimeMessageSource<T>
Merge multiple time-sorted message streams into one. Allows dynamic addition and removal of the source feeds.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
MessageSourceMultiplexer.ExceptionHandler
-
Field Summary
Fields Modifier and Type Field Description protected java.lang.RuntimeException
asyncException
protected java.util.ArrayList<deltix.data.stream.MessageSource<T>>
checkSources
protected T
currentMessage
protected deltix.data.stream.MessageSource<T>
currentSource
protected long
currentTime
protected deltix.util.collections.generated.ObjectHashSet<deltix.data.stream.MessageSource<T>>
emptySources
protected boolean
isAtBeginning
protected boolean
isRealTime
protected PriorityQueue<T>
queue
protected T
realtimeMessage
protected boolean
realTimeNotification
protected boolean
realTimeStarted
-
Constructor Summary
Constructors Constructor Description MessageSourceMultiplexer()
MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification)
MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification, java.util.Comparator<T> c)
MessageSourceMultiplexer(deltix.data.stream.MessageSource<T>... feeds)
MessageSourceMultiplexer(java.util.List<deltix.data.stream.MessageSource<T>> feeds)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
add(deltix.data.stream.MessageSource<T> feed)
Adds a new feed without fast-forwarding it.void
add(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)
Adds a new feed and fast-forwards it (rewinds if descending order), if necessary, until the specified timestamp, unless the allowLateOutOfOrder flag is not set.protected void
addEmptySource(deltix.data.stream.MessageSource<T> feed)
protected void
addSync(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)
protected deltix.util.concurrent.NextResult
advanceRealTime(RealTimeMessageSource<T> source)
void
clearSources()
void
close()
boolean
closeAndRemove(deltix.data.stream.MessageSource<T> feed)
Removes the specified input feed from this multiplexer.protected void
closeFeed(deltix.data.stream.MessageSource<T> feed)
T
createRealTimeMessage()
boolean
getAllowLateOutOfOrder()
deltix.data.stream.MessageSource<T>
getCurrentSource()
long
getCurrentTime()
MessageSourceMultiplexer.ExceptionHandler
getExceptionHandler()
T
getMessage()
protected boolean
handleException(deltix.data.stream.MessageSource<T> feed, java.lang.RuntimeException rtx)
Returns true, if feed is closed.void
invalidateRealTimeState(RealTimeMessageSource<T> source)
boolean
isAtEnd()
boolean
isClosed()
protected boolean
isEmpty()
boolean
isLive()
boolean
isRealTime()
boolean
isRealTimeMessage(T message)
boolean
isRealTimeStarted()
protected deltix.util.concurrent.NextResult
moveNext(deltix.data.stream.MessageSource<T> feed, boolean addEmpty)
boolean
next()
protected deltix.util.concurrent.NextResult
nextAvailable(RealTimeMessageSource<T> feed)
protected deltix.util.concurrent.NextResult
processEmptyQueue(boolean throwable)
boolean
realTimeAvailable()
If true, then MSM will expect real-time message (RealTimeStartMessage) from added sources and will emit RealTimeStartMessage when all sources switched to the real-time mode only.boolean
remove(deltix.data.stream.MessageSource<T> feed)
void
reset(long startTime)
CallclearSources()
, and set fast-forward time to the specified value.void
reset(long startTime, deltix.data.stream.MessageSource<T>... feeds)
CallclearSources()
, and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime
.void
reset(long startTime, java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
CallclearSources()
, and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime
.void
reset(deltix.data.stream.MessageSource<T>... feeds)
CallclearSources()
, and then add the specified feeds.void
reset(java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
CallclearSources()
, and then add the specified feeds.protected void
sendRealTimeMessage(java.lang.String logText)
void
setAllowLateOutOfOrder(boolean allowLateOutOfOrder)
void
setAvailabilityListener(java.lang.Runnable lnr)
void
setException(java.lang.RuntimeException x)
Asynchronous method which will cause next () to throw the specified exception.void
setExceptionHandler(MessageSourceMultiplexer.ExceptionHandler handler)
void
setLive(boolean live)
deltix.data.stream.MessageSource<T>
syncGetCurrentSource()
long
syncGetCurrentTime()
T
syncGetMessage()
boolean
syncIsAtEnd()
boolean
syncIsClosed()
boolean
syncNext()
protected deltix.util.concurrent.NextResult
syncNext(boolean throwable)
-
-
-
Field Detail
-
queue
protected PriorityQueue<T extends deltix.data.stream.TimeStampedMessage> queue
-
currentSource
protected deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage> currentSource
-
currentTime
protected long currentTime
-
currentMessage
protected T extends deltix.data.stream.TimeStampedMessage currentMessage
-
isAtBeginning
protected boolean isAtBeginning
-
emptySources
protected deltix.util.collections.generated.ObjectHashSet<deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>> emptySources
-
checkSources
protected java.util.ArrayList<deltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>> checkSources
-
asyncException
protected java.lang.RuntimeException asyncException
-
realtimeMessage
protected T extends deltix.data.stream.TimeStampedMessage realtimeMessage
-
isRealTime
protected boolean isRealTime
-
realTimeStarted
protected boolean realTimeStarted
-
realTimeNotification
protected final boolean realTimeNotification
-
-
Constructor Detail
-
MessageSourceMultiplexer
public MessageSourceMultiplexer()
-
MessageSourceMultiplexer
public MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification, java.util.Comparator<T> c)
-
MessageSourceMultiplexer
public MessageSourceMultiplexer(boolean ascending, boolean realTimeNotification)
-
MessageSourceMultiplexer
@SafeVarargs public MessageSourceMultiplexer(deltix.data.stream.MessageSource<T>... feeds)
-
MessageSourceMultiplexer
public MessageSourceMultiplexer(java.util.List<deltix.data.stream.MessageSource<T>> feeds)
-
-
Method Detail
-
setAvailabilityListener
public void setAvailabilityListener(java.lang.Runnable lnr)
-
getExceptionHandler
public MessageSourceMultiplexer.ExceptionHandler getExceptionHandler()
-
setExceptionHandler
public void setExceptionHandler(MessageSourceMultiplexer.ExceptionHandler handler)
-
isLive
public boolean isLive()
-
setLive
public void setLive(boolean live)
-
realTimeAvailable
public boolean realTimeAvailable()
If true, then MSM will expect real-time message (RealTimeStartMessage) from added sources and will emit RealTimeStartMessage when all sources switched to the real-time mode only. if true, all added messages sources expected to beRealTimeMessageSource
having realTimeAvailable() = true- Specified by:
realTimeAvailable
in interfaceRealTimeMessageSource<T extends deltix.data.stream.TimeStampedMessage>
- Returns:
- true if MSM supports real-time mode.
-
isRealTime
public boolean isRealTime()
- Specified by:
isRealTime
in interfaceRealTimeMessageSource<T extends deltix.data.stream.TimeStampedMessage>
- Returns:
- true if this source already switched from historical to real-time data portion
-
isRealTimeStarted
public boolean isRealTimeStarted()
-
createRealTimeMessage
public T createRealTimeMessage()
-
isRealTimeMessage
public boolean isRealTimeMessage(T message)
-
getAllowLateOutOfOrder
public boolean getAllowLateOutOfOrder()
-
setAllowLateOutOfOrder
public void setAllowLateOutOfOrder(boolean allowLateOutOfOrder)
-
closeAndRemove
public boolean closeAndRemove(deltix.data.stream.MessageSource<T> feed)
Removes the specified input feed from this multiplexer. If the cursor is not at beginning, and current feed is removed, the current message becomesnull
. The cursor, however, is not advanced untilnext()
is called.- Parameters:
feed
- The feed to closeAndRemove.- Returns:
- If current message was removed.
-
remove
public boolean remove(deltix.data.stream.MessageSource<T> feed)
-
add
public void add(deltix.data.stream.MessageSource<T> feed)
Adds a new feed without fast-forwarding it.- Parameters:
feed
- The new feed
-
add
public void add(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)
Adds a new feed and fast-forwards it (rewinds if descending order), if necessary, until the specified timestamp, unless the allowLateOutOfOrder flag is not set. If the feed implementsIntermittentlyAvailableResource
, an availability listener is automatically installed in it.- Parameters:
feed
- The new feedfastForwardToTime
- Scroll the feed to this timestamp.
-
closeFeed
protected final void closeFeed(deltix.data.stream.MessageSource<T> feed)
-
addEmptySource
protected void addEmptySource(deltix.data.stream.MessageSource<T> feed)
-
handleException
protected final boolean handleException(deltix.data.stream.MessageSource<T> feed, java.lang.RuntimeException rtx)
Returns true, if feed is closed.- Parameters:
feed
-rtx
-- Returns:
- true, if feed is closed
-
nextAvailable
protected deltix.util.concurrent.NextResult nextAvailable(RealTimeMessageSource<T> feed)
-
moveNext
protected deltix.util.concurrent.NextResult moveNext(deltix.data.stream.MessageSource<T> feed, boolean addEmpty)
-
advanceRealTime
protected final deltix.util.concurrent.NextResult advanceRealTime(RealTimeMessageSource<T> source)
-
addSync
protected final void addSync(deltix.data.stream.MessageSource<T> feed, long fastForwardToTime)
-
clearSources
public void clearSources()
-
reset
public void reset(deltix.data.stream.MessageSource<T>... feeds)
CallclearSources()
, and then add the specified feeds.- Parameters:
feeds
- The feeds to add.
-
reset
public void reset(java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
CallclearSources()
, and then add the specified feeds.- Parameters:
feeds
- The feeds to add.
-
reset
public void reset(long startTime, deltix.data.stream.MessageSource<T>... feeds)
CallclearSources()
, and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime
.- Parameters:
startTime
- Fast-forward to this time.feeds
- The feeds to add.
-
reset
public void reset(long startTime)
CallclearSources()
, and set fast-forward time to the specified value.- Parameters:
startTime
- Fast-forward to this time.
-
reset
public void reset(long startTime, java.util.Collection<deltix.data.stream.MessageSource<T>> feeds)
CallclearSources()
, and then add the specified feeds, fast-forwarding each until the timestamp of the current message from the feed is at leaststartTime
.- Parameters:
startTime
- Fast-forward to this time.feeds
- The feeds to add.
-
invalidateRealTimeState
public void invalidateRealTimeState(RealTimeMessageSource<T> source)
-
setException
public void setException(java.lang.RuntimeException x)
Asynchronous method which will cause next () to throw the specified exception.- Parameters:
x
-
-
next
public boolean next()
- Specified by:
next
in interfacedeltix.util.concurrent.AbstractCursor
-
syncNext
public boolean syncNext()
-
syncNext
protected deltix.util.concurrent.NextResult syncNext(boolean throwable)
-
processEmptyQueue
protected final deltix.util.concurrent.NextResult processEmptyQueue(boolean throwable)
-
sendRealTimeMessage
protected final void sendRealTimeMessage(java.lang.String logText)
-
isEmpty
protected boolean isEmpty()
-
isAtEnd
public final boolean isAtEnd()
- Specified by:
isAtEnd
in interfacedeltix.util.concurrent.AbstractCursor
-
getMessage
public final T getMessage()
- Specified by:
getMessage
in interfacedeltix.data.stream.MessageSource<T extends deltix.data.stream.TimeStampedMessage>
-
getCurrentTime
public final long getCurrentTime()
-
getCurrentSource
public deltix.data.stream.MessageSource<T> getCurrentSource()
-
isClosed
public boolean isClosed()
-
syncIsAtEnd
public final boolean syncIsAtEnd()
-
syncGetMessage
public final T syncGetMessage()
-
syncGetCurrentTime
public final long syncGetCurrentTime()
-
syncGetCurrentSource
public deltix.data.stream.MessageSource<T> syncGetCurrentSource()
-
syncIsClosed
public boolean syncIsClosed()
-
close
public void close()
- Specified by:
close
in interfacejava.lang.AutoCloseable
- Specified by:
close
in interfacejava.io.Closeable
- Specified by:
close
in interfacedeltix.util.lang.Disposable
-
-