Streams
Durable Streams
The content of durable streams is persisted on disk.
Durable messages are stored in Time Slice Files (TSF) of a predefined size. One of the specifics of such data storage model is that data in the latest TSF file, until persisted, is stored in the system memory. Until data in the latest TSF file is flushed on disk, there may be a risk of loosing this data.
This may be very important for rarely updated streams, where data in the latest TSF file may not be persisted for quite a long time period, because file capacity has not been reached. To mitigate risks of loosing data, durable streams may be configured to flush data on disk at a predefined time and the size of the actual TSF file may be adjusted as well. Data may be flushed by TimeBase automatically or via API on the side of data loaders.
//Create a durable stream
RecordclassNameDescriptor classNameDescriptor = Introspector.createMessageIntrospector().introspectRecordclassName(TestMessage.className);
...
String streamName = "durable";
StreamOptions options = StreamOptions.fixedType(StreamScope.DURABLE, streamName, streamName, 0, classNameDescriptor);
DXTickStream stream = db.createStream(streamName, streamOptions);
...
info
You can also create a durable stream by running a Data Definition Language command.
Transient Streams
Transient types of TimeBase streams are used for message brokerage. Transient messages are stored in the system circular memory buffer. When data in a transient stream is read by all readers (cursors), it is discarded.
Example
Suppose that Cursors 1-3 are the only registered readers. Cursor 3 is the slowest one (behind the other two). When Cursor 3 reads the next message (on the far left of the box), it will "fall off the queue" and be discarded. If at this time Cursor 4 were to subscribe for this stream, it would never get the discarded message. In fact, if no cursors are subscribed to a transient stream, then all messages written to it are immediately discarded. In TimeBase, a transient stream is almost indistinguishable from a durable stream API-wise. Just like multiple clients may subscribe to a transient stream, they could subscribe to a durable stream in the exact same way. A client can even open a cursor that is subscribed to a number of streams at the same time, some durable and some transient.
//Create a transient stream
RecordclassNameDescriptor classNameDescriptor = Introspector.createMessageIntrospector().introspectRecordclassName(TestMessage.className);
...
String streamName = "transient";
StreamOptions options = StreamOptions.fixedType(StreamScope.TRANSIENT, streamName, streamName, 0, classNameDescriptor);
DXTickStream stream = db.createStream(streamName, streamOptions);
...
info
You can also create a transient stream by running a Data Definition Language command.
Memory Management for Transient Streams
As stated earlier, transient messages are stored in a circular memory buffer. In case any cursor (reader) is slower in consuming stream data, the pressure on the memory may start growing. In this case memory buffer will start expanding within predefined settings:
- Memory buffer maximum amount of memory to use, in bytes. Set this limit to a reasonable number, depending on the amount of available memory.
- Memory buffer maximum time difference between head and tail. This is important when buffering live data, and you do not need data older than the specified time range (e.g. 30 seconds).
Either or both ways can be used independently. What happens when the limit is exceeded, and the buffer fills up, depends on the type of the transient stream.
There are two types of transient streams:
- Lossy: it is the most common transient stream type. In case the buffer gets full, writers (loaders) continue to write messages (on the right side of the diagram). This causes messages on the left side of the diagram to fall off and get discarded, even though they have not been read. In this case, one or more slower readers may miss a chance to read some messages, and will jump to the next available message. TimeBase will diligently notify slow readers of the fact that they are jumping over a gap.
- Lossless: less common type of transient streams. In case the buffer is full, writers (loaders) will be stopped (blocked), until the slowest reader frees up some room by reading more messages. A lossless stream may be the right option when data loss is not tolerable. However, in a lossless stream, even a single slow reader can slow down or even completely stop the entire process of dispatching messages, even if all other readers are able to handle the flow. The lossless configuration is used rarely in practice, but TimeBase makes this option available.
//Create a lossless transient stream
...
RecordclassNameDescriptor classNameDescriptor = Introspector.createMessageIntrospector().introspectRecordclassName(TestMessage.className);
...
String streamName = "transient-lossless";
StreamOptions options = StreamOptions.fixedType(StreamScope.TRANSIENT, streamName, streamName, 0, classNameDescriptor);
streamOptions.bufferOptions = new BufferOptions();
streamOptions.bufferOptions.lossless = true;
DXTickStream stream = db.createStream(streamName, streamOptions);
...
Latest Events Snapshot
Consider a case, when somebody publishes trading portfolio events into TimeBase. Every time a certain trade instrument changes, we send a message to TimeBase. Some business cases may require getting a snapshot of the last values for each trading instrument in the portfolio. One of the solutions is to use a reversal cursor to scan the entire stream backwards to search for required messages. Due to the fact, that TimeBase stores data in a chronological order, such approach may be inefficient and time-consuming, because it may lead to going through all historical messages one by one, looking for the right once.
TimeBase offers a better solution to this problem with a unique
stream option (see example below - streamOptions.unique = true;
), that can be applied for both durable and transient types of streams. This option allows you to get a snapshot of the latest stream messages out-of-the-box. In case activated, it enables streams to keep a cache
of last
positions for each stream symbol in TimeBase server cache. Any new subscriber (cursor) receives a snapshot of all known positions immediately before any live messages. You may inspect timestamps
to distinguish between live data and historical data snapshots.
Cache key
is the message primary key (pk
), which is defined in the className schema. If schema does not contain any fields marked as pk
, then symbol name
is used as a primary key
. When user creates cursor on a unique stream, it additionally pushes all cache ahead of stream data. Cache is flushed on disk periodically and, in case of a system restart, contains the latest messages of each className per key
.
Unique streams have the same behavior as regular (Durable or Transient) streams and additionally contain internal cache of messages. Special action or API changes are not required to publish data into a Unique stream. Special action is not required from a subscriber (cursor) to read from a Unique stream.
//Create a durable stream with a unique cache
RecordclassNameDescriptor classNameDescriptor = Introspector.createMessageIntrospector().introspectRecordclassName(TestMessage.className);
...
String streamName = "durable-unique";
StreamOptions options = StreamOptions.fixedType(StreamScope.DURABLE, streamName, streamName, 0, messageDescriptor);
streamOptions.unique = true;
DXTickStream stream = db.createStream(streamName, streamOptions);
...
...
// Consume data from a durable stream with a unique stream option
SelectionOptions options = new SelectionOptions();
options.rebroadcast = true;
long startTime = System.currentTimeMillis();
try (TickCursor cursor = stream.select(startTime, options)) {
while (cursor.next()) {
InstrumentMessage message = cursor.getMessage();
if (message.getTimeStampMs() < startTime) {
// snapshot message
} else {
// live message
}
}
}
...