Skip to main content

Topics

Overview

TimeBase Topics is a part of the TimeBase API that allows achieving lower latency than TimeBase Streams by sending data directly from data producers to data consumers, thus bypassing the TimeBase server.

TimeBase Topics use the Aeron library as transport.

On the API level, TimeBase Topics is compatible with the standard TimeBase cursors/loaders API.

There are several Topic types:

  • UDP_SINGLE_PUBLISHER: Main API. Sends data using UDP. Supports only one producer per Topic. Producer address must be specified at time of Topic creation.
  • IPC: Uses inter-process communication. Works only on the same physical machine between processes that share same Aeron driver. Supports multiple loaders.
  • MULTICAST: Experimental. Transfers data using UDP multicast. Support of multicast in the network is required. Supports multiple producers.

Main Features

  • Low latency
  • High throughput
  • Same message format as streams
  • Data sent directly from producer to all consumers

Limitations and Considerations

  • Topics have very limited API. No filtering, no channel multiplexing, no data manipulation, no schema changes, etc.
  • Data is not persisted unless copytostream option is not set. See details.
  • Generally, you need a dedicated CPU core for each Topic consumer unless you:
    • Use a non-blocking API (IntermittentlyAvailableCursor or MessagePoller interface) and
    • Already have a dedicated core for the thread that processes messages from the Topic
  • If the copytostream option is used, up to 1 additional CPU core for the TimeBase server per Topic (maybe less, depends on load).
  • Message timestamps are set on data producer side. For Topics, it's necessary to ensure that clocks on the producer side are in-sync with all other clocks in the setup.
  • Additional data remote consumers add some extra CPU and memory load to the data producer.
  • The slowest data consumers slow down (and block!) other consumers and the data producer if they're unable to handle incoming data bursts.
  • Configuration for Topics is more complex:
    • You have to plan ahead how may Topics you will use, how much data you may need to buffer in Topics to handle message bursts, how many CPU cores you can afford to allocated for Topics, etc.
    • It's necessary to run Aeron driver on each machine that uses Topics.
    • Topics extra effort needs to be configured in containerized setups:
      • It's necessary to correctly share Aeron driver directory between containers.
      • It's necessary to plan ahead the amount of native memory to allocate for Topics on each container.

copyToStream Feature

On the Topic level, there is a configuration option called copyToStream that allows wiritng a copy of the data that was sent to a given Topic.

When this option is enabled, the TimeBase server becomes a "consumer" for the corresponding Topic and writes the received data into a "traditional" TimeBase stream. Since the server acts as a regular consumer, other consumers may receive the data before it gets written to the stream. In the event of TimeBase server failure or network connectivity loss, the persisted data may not contain some messages that were already processed by other Topic consumers.

Each Topic that has this option enabled adds CPU load to TimeBase (up to one core used per Topic), even if there are no active producers for the Topic. Therefore, using many copyToStream Topics may penalize the TB server's performance.

Messages coming though the copyToStream Topic are likely to have significantly worse latency than the data that is directly sent to a TB stream.

Aeron Driver

To use the Topics feature, configure and run the Aeron Driver on each machine that uses Topics, including:

  • The machine with the TimeBase server
  • All machines using the TimeBase client to interact with Topics

The Aeron Driver runs as a separate process that communicates with the TimeBase client or server on the same machine using memory mapped files. You only need to run a single instance of the Aeron Driver per machine. Multiple java processes may share a single Aeron Driver instance.

The process works as follows:

  1. When a TimeBase Topic data producer sends a message, that message gets transferred from the TimeBase client to the Aeron Driver on the same machine using IPC.
  2. The Aeron Driver sends this data to an Aeron Driver on the receiving machine using UDP.
  3. The receiving Aeron Driver passes this data to the TimeBase client using IPC.

Memory Usage Considerations

Topics use shared memory for communication between the Aeron Driver and the TimeBase server and client. For each topic it is necessary to allocate a certain amount of physical memory that will be shared by each process that uses Topics. With default settings, each Topic needs about 50Mb of memory. That memory is allocated as offheap Java memory and as a memory-mapped file on the file system.

Total memory needed for Topics can be calculated as follows:

TotalTopicTermMemory = 50Mb * <total number of Topics to be used>

If you use containers, then you need to add that amount to memory limit of each of the containers that use topics. So that memory have to be added to:

  • TimeBase server container memory limit if you use "copyToStream" feature.
  • TimeBase client container memory limit if you use Topics in the client.
  • Memory limit of Aeron Driver container
caution

It's important that memory that is dedicated to Topics must not be included into the Java heap memory limit. So this means that you can't use percentage based JVM heap options like -XX:InitialRAMPercentage and -XX:MaxRAMPercentage if you use topics without corresponding adjustments.

caution

It's necessary to specify the amount of memory dedicated to Topics in the TimeBase server process configuration using admin.properties file or a system property:

TimeBase.topics.totalTermMemoryLimit=<TotalTopicTermMemory>MB
# Example
TimeBase.topics.totalTermMemoryLimit=1500MB

The memory limit for the Aeron Driver process can be computed as follows:

AeronDriverContaineremoryLimit = TotalTopicTermMemory + DriverHeapMemiory + FixedOffheapMemory

Where DriverHeapMemory is the value of -Xmx option for the Aeron Driver process (200Mb recommended), FixedOffheapMemory is a fixed amount of value 50Mb.

So if you want to support 30 Topics, then TotalTopicTermMemory = 50Mb * 30 = 1500Mb.

And memory limit for Aeron driver container should be at least AeronDriverContaineremoryLimit=1500Mb + 200Mb + 50Mb = 1750Mb (with -Xms200m -Xmx200m).

note

While the specified memory limit is added to each container, the actual physical memory used for Topics is shared between all containers.

Running Aeron Driver

The Aeron Driver needs a path on the file system (preferably on /dev/shm, e.g. /dev/shm/aeron) where the driver can store metadata and place memory mapped files. All processes on the machine that uses Aeron have to be configured with the exact same Aeron driver path (aeron.dir).

The optimal way to run the Aeron Driver depends on the details of the specific setup.

The most simple way is to use the jars that come with the TimeBase distribution:

java -Daeron.dir=/YOUR/AERON/DIR -cp /timebase/distribution/lib/* io.aeron.driver.MediaDriver

Replace /YOUR/AERON/DIR with your aeron.dir value.

tip

Use the same version of the Aeron Driver as the version that comes with the TimeBase client.

caution

If you run the Aeron Driver in a Docker container, do not use --oom-kill-disable option. Otherwise, the container will just hang when it runs out of memory.

Embedded Aeron Driver

TimeBase offers an option to run the Aeron Driver as an embedded process inside TimeBase. This is useful for development and testing, but it's not recommended for production use.

See Using Embedded Aeron Driver for details.

Configuring TimeBase Server

Add to admin.properties:

TimeBase.aeron.enabled=true
TimeBase.topics.totalTermMemoryLimit=0MB

For TimeBase.topics.totalTermMemoryLimit put the TotalTopicTermMemory value that was computed in the Memory Usage Considerations section.

For development purposes it's possible to turn off the limit using TimeBase.topics.totalTermMemoryLimit=-1.

tip

TimeBase.aeron.enabled=true may be not necessary for some versions of TimeBase where Topics are enabled by default.

Java options for the TimeBase server to configure external Aeron Driver:

-DTimeBase.transport.aeron.external.driver.dir=/YOUR/AERON/DIR
-DTimeBase.transport.aeron.id.range=100:10000000

The TimeBase.transport.aeron.id.range option sets a range of Aeron's stream IDs that can be used by TimeBase server. That range can be changed if you share the Aeron Driver instance with applications other than TimeBase. For example, the Deltix Ember app may use Aeron for data replication with the same Aeron Driver, so it's necessary to ensure that stream IDs used by Ember do not overlap with stream IDs specified in TimeBase.transport.aeron.id.range.

Using Embedded Aeron Driver

TimeBase offers an option to run the Aeron Driver as an embedded process inside TimeBase. This is useful for development and testing, but it's not recommended for production use.

To use embedded Aeron driver, then you only append this to admin.properties:

TimeBase.aeron.enabled=true

And ensure that you do not set any value for TimeBase.transport.aeron.external.driver.dir property.

Configuring TimeBase Client

Java options for the TimeBase client to configure the Aeron Driver:

-DTimeBase.transport.aeron.directory=/YOUR/AERON/DIR

Alternatively, you can set this using the AERON_DIR environment variable.

Running in Docker

When a TimeBase server or client is executed in a container (Docker, etc.), ensure that it shares the same folder on the host file system as the Aeron Driver process.

Mount the Aeron directory to each container. It's optimal to place the Aeron directory in \dev\shm (e.g. /dev/shm/aeron).

Topic Usage Caveats

There are few significant points on how the Topic API is different from the Stream API.

Closing Topic Producer Before Shutdown

Before application shutdown, each MessageChannel returned by the TopicDB#createPublisher(...) API must be explicitly closed. Otherwise, all consumers of this Topic will get ClosedDueToDataLossException and stop processing new data. See also TopicDataLossHandler.

MessageChannel and MessagePoller Thread Safety

MessageChannel, MessagePoller, and other consumer-side API classes for Topics are not thread safe.

It's not permitted to call methods (including .close()) from different threads on them without proper synchronization.

Additional Settings

Topic Settings

Term Buffer Size

On the transport level, the amount of data that can be sent by the producer and not yet processed by the consumer is capped by the size of the corresponding buffers and proportional to Aeron's term buffer size. The default size of the term buffer in TimeBase Topics is set to 16Mb. You can change the size of the buffer by setting the corresponding property on TopicSettings during Topic creation:

TopicSettings settings = new TopicSettings()
.setTermBufferLength(128 * 1024 * 1024);

Consumer Settings

TopicDataLossHandler

By default, whenever a Topic data consumer detects data loss, the consumer throws deltix.qsrv.hf.Topic.consumer.ClosedDueToDataLossException and stops processing any further data.

This behavior can be changed by setting ConsumerPreferences#TopicDataLossHandler:

ConsumerPreferences preferences = new ConsumerPreferences().setTopicDataLossHandler(new TopicDataLossHandler() {
@Override
public boolean handleDataLoss() {
// Data loss is detected.
// TODO: Update process state to reflect data loss.

return true; // Continue processing
}
});
MessageSource<InstrumentMessage> messageSource = TopicDB.createConsumer("testTopic", preferences, null);

Code Example

TimeBase Topics offers two different APIs for consumers.

The following example shows how to create:

  • A Topic
  • A data producer
  • Different types of data consumers
package deltix.qsrv.hf.tickdb.topic;

import deltix.data.stream.MessageChannel;
import deltix.data.stream.MessageSource;
import deltix.qsrv.hf.pub.InstrumentMessage;
import deltix.qsrv.hf.pub.md.RecordClassDescriptor;
import deltix.qsrv.hf.tickdb.pub.DXTickDB;
import deltix.qsrv.hf.tickdb.pub.StreamConfigurationHelper;
import deltix.qsrv.hf.tickdb.pub.TickDBFactory;
import deltix.qsrv.hf.tickdb.pub.topic.MessagePoller;
import deltix.qsrv.hf.tickdb.pub.topic.MessageProcessor;
import deltix.qsrv.hf.tickdb.pub.topic.TopicDB;
import deltix.qsrv.hf.tickdb.pub.topic.exception.TopicNotFoundException;
import deltix.qsrv.hf.tickdb.pub.topic.settings.TopicSettings;
import deltix.timebase.api.messages.TradeMessage;
import deltix.util.io.idlestrat.YieldingIdleStrategy;

import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

public class TopicUsageExample {
/**
* Needs configured Aeron dir property and running Aeron driver on that dir:
* -DTimeBase.transport.aeron.directory=/YOUR/AERON/DIR
*/
public static void main(String[] args) throws InterruptedException {
String tbServerUrl = "dxtick://localhost:8011";

try (DXTickDB db = TickDBFactory.openFromUrl(tbServerUrl, false)) {
TopicDB topicDB = db.getTopicDB();

try {
topicDB.deleteTopic("testTopic");
} catch (TopicNotFoundException ignore) {
// Ignore
}

createTopic(topicDB);

AtomicBoolean stopPublisher = new AtomicBoolean(false);
AtomicBoolean stopConsumer = new AtomicBoolean(false);

Thread producerThread = new Thread(() -> runPublisher(topicDB, stopPublisher));
Thread consumerThread = new Thread(() -> runConsumer(topicDB, stopConsumer));
Thread pollingConsumerThread = new Thread(() -> runPollingConsumer(topicDB, stopConsumer));

producerThread.start();
consumerThread.start();
pollingConsumerThread.start();

System.out.println("Running...");

// Let it run for some time
Thread.sleep(20_000);

System.out.println("Stopping...");

stopConsumer.set(true);
stopPublisher.set(true);

// Consumer thread uses blocking read, so we may need to interrupt it to stop it.
// Polling consumer thread uses non-blocking read, so we don't need to interrupt it to stop it.
consumerThread.interrupt();

producerThread.join();
consumerThread.join();
pollingConsumerThread.join();

topicDB.deleteTopic("testTopic");
}
}

private static void createTopic(TopicDB topicDB) {
RecordClassDescriptor rcd = StreamConfigurationHelper.mkUniversalTradeMessageDescriptor();

TopicSettings settings = new TopicSettings()
.setSinglePublisherUdpMode("localhost"); // Address of the publisher. You have to specify it on topic creation.
topicDB.createTopic("testTopic", new RecordClassDescriptor[]{rcd}, settings);
}

private static void runPublisher(TopicDB topicDB, AtomicBoolean stopPublisher) {
TradeMessage msg = new TradeMessage();
msg.setSymbol("ABC");

Random rng = new Random();

try (MessageChannel<InstrumentMessage> channel = topicDB.createPublisher("testTopic", null, null)) {
while (!stopPublisher.get()) {
// Prepare message
msg.setSymbol("ABC");
msg.setSize(10 + rng.nextInt(100));
msg.setPrice(100.00 * rng.nextDouble());

// Send it
channel.send(msg);
}
}
}

private static void runConsumer(TopicDB topicDB, AtomicBoolean stopConsumer) {
double totalSize = 0;
try (MessageSource<InstrumentMessage> messageSource = topicDB.createConsumer("testTopic", null, null)) {
// Note: "messageSource.next()" for topics will always return true.
while (messageSource.next() && !stopConsumer.get()) {
TradeMessage tradeMsg = (TradeMessage) messageSource.getMessage();
double size = tradeMsg.getSize();
totalSize += size;
}
} finally {
System.out.println("Total size for consumer: " + totalSize);
}
}

// The recommended way to consume data from a topic
private static void runPollingConsumer(TopicDB topicDB, AtomicBoolean stopConsumer) {
YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();

CustomMessageProcessor processor = new CustomMessageProcessor();

try (MessagePoller messagePoller = topicDB.createPollingConsumer("testTopic", null)) {

while (!stopConsumer.get()) {
// Non-blocking call
int messagesProcessing = messagePoller.processMessages(1000, processor);

// Not necessary. This is only needed if your thread does not have other work to do, and you want to reduce CPU usage.
idleStrategy.idle(messagesProcessing);
}
} finally {
System.out.println("Total size for polling consumer: " + processor.totalSize);
}
}

private static class CustomMessageProcessor implements MessageProcessor {
double totalSize = 0;

@Override
public void process(InstrumentMessage msg) {
// Message processing logic

TradeMessage tradeMsg = (TradeMessage) msg;
double size = tradeMsg.getSize();
totalSize += size;
}
}
}