Skip to main content

ClickHouse

Overview

Use our proprietary open source connector to replicate TimeBase stream data to/from ClickHouse database.

ClickHouse is a column-oriented database management system (DBMS) for online analytical processing of queries (OLAP) that allows to generate analytical reports using SQL queries in real-time.

TimeBase stores time series events as Messages and each type of event has a personal message class assigned to it in TimeBase. Message class has a set of fields (attributes) that characterize, describe, identify each specific type of event. In object-oriented programing languages messages can be seen as classes, each with a specific set of fields. Messages are stored in Streams chronologically by their timestamps for each symbol. Refer to a Basic Concepts page to learn more about TimeBase main principles and data structure.

To replicate TimeBase stream data to ClickHouse, we take objects and classes from a particular TimeBase stream and unfold them so each field corresponds to a particular ClickHouse table column. Timestamp, Partition and Instrument are auto generated and common for all ClickHouse tables where Instrument + Timestamp = PrimaryKey. ClickHouse tables are named after TimeBase stream names. Tables rows are created for each TimeBase message in a chronological order.

Let's take a look at a simplified example. In this example we will show how a message with fixed-type and polymorphic objects is transformed into a ClickHouse table.

For the example, we take a message with two fixed-type fields Symbol and Timestamp, and a polymorphic array entries with two types of entries (classes) Trade and BBO, each having a specific set of fields - shown on the illustration above.

Such data structure can be transformed to a ClickHouse table the following way:

TimestampSymbolentries.typeentries. price _float64entries. size _float64entries. askPrice _float64entries. askSize _float64entries. bidPrice _float64entries. bidSize _float64entries. exchange _string
2021-08-25 T07:00:00.025Zbtcusdentries.trade, entries.bbo1278910kraken,kraken
2021-08-25 T07:00:00.026Zbtcusdentries.bbo3456kraken

On the above table we see, that first data partition includes two entries of different types and the second just one. This way the output table has columns for each field from our source message, including objects' fields in the polymorphic array entries.

The column naming convention:

  • column names for fixed-type objects' fields are named after the particular fields as is: for example Symbol.
  • column names for polymorphic objects' fields follow this pattern: object-name.field-name_field-data-type.

We can describe a more realistic example now. Let's take a Kraken stream.

Kraken stream schema example:
// On the stream schema you can find all the included classes, enums and objects, their fields and data types.

DURABLE STREAM "kraken" (
CLASS "deltix.timebase.api.messages.MarketMessage" 'Market Message' (
"currencyCode" 'Currency Code' INTEGER SIGNED (16),
"originalTimestamp" 'Original Timestamp' TIMESTAMP,
"sequenceNumber" 'Sequence Number' INTEGER,
"sourceId" 'Source Id' VARCHAR ALPHANUMERIC (10)
);
ENUM "deltix.timebase.api.messages.universal.PackageType" 'Package Type' (
"VENDOR_SNAPSHOT" = 0,
"PERIODICAL_SNAPSHOT" = 1,
"INCREMENTAL_UPDATE" = 2
);
CLASS "deltix.timebase.api.messages.universal.PackageHeader" 'Package Header'
UNDER "deltix.timebase.api.messages.MarketMessage" (
"packageType" 'Package Type' "deltix.timebase.api.messages.universal.PackageType" NOT NULL
);
CLASS "deltix.timebase.api.messages.universal.BaseEntry" 'Base Entry' (
"contractId" 'Contract ID' VARCHAR ALPHANUMERIC (10),
"exchangeId" 'Exchange Code' VARCHAR ALPHANUMERIC (10),
"isImplied" 'Is Implied' BOOLEAN
);
CLASS "deltix.timebase.api.messages.universal.BasePriceEntry" 'Base Price Entry'
UNDER "deltix.timebase.api.messages.universal.BaseEntry" (
"numberOfOrders" 'Number Of Orders' INTEGER,
"participantId" 'Participant' VARCHAR,
"price" 'Price' FLOAT DECIMAL64,
"quoteId" 'Quote ID' VARCHAR,
"size" 'Size' FLOAT DECIMAL64
);
ENUM "deltix.timebase.api.messages.QuoteSide" (
"BID" = 0,
"ASK" = 1
);
CLASS "deltix.timebase.api.messages.universal.L1Entry" 'L1Entry'
UNDER "deltix.timebase.api.messages.universal.BasePriceEntry" (
"isNational" 'Is National' BOOLEAN,
"side" 'Side' "deltix.timebase.api.messages.QuoteSide" NOT NULL
);
CLASS "deltix.timebase.api.messages.universal.L2EntryNew" 'L2EntryNew'
UNDER "deltix.timebase.api.messages.universal.BasePriceEntry" (
"level" 'Level Index' INTEGER NOT NULL SIGNED (16),
"side" 'Side' "deltix.timebase.api.messages.QuoteSide" NOT NULL
);
ENUM "deltix.timebase.api.messages.BookUpdateAction" 'Book Update Action' (
"INSERT" = 0,
"UPDATE" = 1,
"DELETE" = 2
);
CLASS "deltix.timebase.api.messages.universal.L2EntryUpdate" 'L2EntryUpdate'
UNDER "deltix.timebase.api.messages.universal.BasePriceEntry" (
"action" 'Action' "deltix.timebase.api.messages.BookUpdateAction" NOT NULL,
"level" 'Level Index' INTEGER NOT NULL SIGNED (16),
"side" 'Side' "deltix.timebase.api.messages.QuoteSide"
);
ENUM "deltix.timebase.api.messages.AggressorSide" 'Aggressor Side' (
"BUY" = 0,
"SELL" = 1
);
ENUM "deltix.timebase.api.messages.TradeType" (
"REGULAR_TRADE" = 0,
"AUCTION_CLEARING_PRICE" = 1,
"CORRECTION" = 2,
"CANCELLATION" = 3,
"UNKNOWN" = 4
);
CLASS "deltix.timebase.api.messages.universal.TradeEntry" 'Trade Entry'
UNDER "deltix.timebase.api.messages.universal.BaseEntry" (
"buyerNumberOfOrders" 'Buyer Number Of Orders' INTEGER,
"buyerOrderId" 'Buyer Order ID' VARCHAR,
"buyerParticipantId" 'Buyer Participant ID' VARCHAR,
"condition" 'Condition' VARCHAR,
"matchId" 'Match ID' VARCHAR,
"price" 'Price' FLOAT DECIMAL64,
"sellerNumberOfOrders" 'Seller Number Of Orders' INTEGER,
"sellerOrderId" 'Seller Order ID' VARCHAR,
"sellerParticipantId" 'Seller Participant ID' VARCHAR,
"side" 'Side' "deltix.timebase.api.messages.AggressorSide",
"size" 'Size' FLOAT DECIMAL64,
"tradeType" 'Trade Type' "deltix.timebase.api.messages.TradeType"
);
CLASS "deltix.qsrv.hf.plugins.data.kraken.types.KrakenTradeEntry" 'Kraken Trade Entry'
UNDER "deltix.timebase.api.messages.universal.TradeEntry" (
"orderType" 'Order Type' CHAR
);
ENUM "deltix.timebase.api.messages.DataModelType" (
"LEVEL_ONE" = 0,
"LEVEL_TWO" = 1,
"LEVEL_THREE" = 2,
"MAX" = 3
);
CLASS "deltix.timebase.api.messages.universal.BookResetEntry" 'Book Reset Entry'
UNDER "deltix.timebase.api.messages.universal.BaseEntry" (
"modelType" 'Model Type' "deltix.timebase.api.messages.DataModelType" NOT NULL,
"side" 'Side' "deltix.timebase.api.messages.QuoteSide"
);
CLASS "deltix.qsrv.hf.plugins.data.kraken.types.KrakenPackageHeader" 'Kraken Package Header'
UNDER "deltix.timebase.api.messages.universal.PackageHeader" (
"entries" 'Entries'
ARRAY(OBJECT("deltix.timebase.api.messages.universal.L1Entry", "deltix.timebase.api.messages.universal.L2EntryNew",
"deltix.timebase.api.messages.universal.L2EntryUpdate", "deltix.qsrv.hf.plugins.data.kraken.types.KrakenTradeEntry",
"deltix.timebase.api.messages.universal.BookResetEntry") NOT NULL) NOT NULL
);
ENUM "deltix.timebase.api.messages.service.DataConnectorStatus" 'Data Connector Status' (
"INITIAL" = 0,
"CONNECTED_BY_USER" = 1,
"AUTOMATICALLY_RESTORED" = 2,
"DISCONNECTED_BY_USER" = 3,
"DISCONNECTED_BY_COMPLETED_BATCH" = 4,
"DISCONNECTED_BY_VENDOR_AND_RECONNECTING" = 5,
"DISCONNECTED_BY_VENDOR_AND_HALTED" = 6,
"DISCONNECTED_BY_ERROR_AND_RECONNECTING" = 7,
"DISCONNECTED_BY_ERROR_AND_HALTED" = 8,
"RECOVERING_BEGIN" = 9,
"LIVE_BEGIN" = 10
);
CLASS "deltix.timebase.api.messages.service.ConnectionStatusChangeMessage" 'Connection Status Change Message' (
"cause" 'Cause' VARCHAR,
"status" 'Status' "deltix.timebase.api.messages.service.DataConnectorStatus"
);
ENUM "deltix.timebase.api.messages.status.SecurityStatus" (
"FEED_CONNECTED" = 0,
"FEED_DISCONNECTED" = 1,
"TRADING_STARTED" = 2,
"TRADING_STOPPED" = 3
);
CLASS "deltix.timebase.api.messages.status.SecurityStatusMessage" 'Security Status Change Message'
UNDER "deltix.timebase.api.messages.MarketMessage" (
"cause" 'Cause' VARCHAR,
"exchangeId" 'Exchange Code' VARCHAR ALPHANUMERIC (10)
"originalStatus" 'Original Status' VARCHAR,
"status" 'Status' "deltix.timebase.api.messages.status.SecurityStatus"
);
)
OPTIONS (POLYMORPHIC; PERIODICITY = 'IRREGULAR'; HIGHAVAILABILITY = FALSE)

The ClickHouse table will have the following structure. Here we see, that message fixed-type fields are named as currencyCode_N_Int16 Nullable(Int16), whereas fields of a polymorphic object have array added entries.exchangeId_N_String Array(Nullable(String)).

ClickHouse table example:
CREATE TABLE clickhouse.kraken (
`partition` Date,
`timestamp` DateTime64(9),
`instrument` String,
`type` String,
`currencyCode_N_Int16` Nullable(Int16),
`originalTimestamp_N_DateTime64_3` Nullable(DateTime64(3)),
`sequenceNumber_N_Int64` Nullable(Int64),
`sourceId_N_String` Nullable(String),
`packageType_PackageType` Enum16(
'VENDOR_SNAPSHOT' = 0,
'PERIODICAL_SNAPSHOT' = 1,
'INCREMENTAL_UPDATE' = 2
),
`entries.type` Array(Nullable(String)),
`entries.contractId_N_String` Array(Nullable(String)),
`entries.exchangeId_N_String` Array(Nullable(String)),
`entries.isImplied_N_UInt8` Array(Nullable(UInt8)),
`entries.numberOfOrders_N_Int64` Array(Nullable(Int64)),
`entries.participantId_N_String` Array(Nullable(String)),
`entries.price_N_Decimal128_12` Array(Nullable(Decimal(38, 12))),
`entries.quoteId_N_String` Array(Nullable(String)),
`entries.size_N_Decimal128_12` Array(Nullable(Decimal(38, 12))),
`entries.isNational_N_UInt8` Array(Nullable(UInt8)),
`entries.side_QuoteSide` Array(Enum16('BID' = 0, 'ASK' = 1)),
`entries.level_Int16` Array(Int16),
`entries.action_BookUpdateAction` Array(Enum16('INSERT' = 0, 'UPDATE' = 1, 'DELETE' = 2)),
`entries.side_N_QuoteSide` Array(Nullable(Enum16('BID' = 0, 'ASK' = 1))),
`entries.buyerNumberOfOrders_N_Int64` Array(Nullable(Int64)),
`entries.buyerOrderId_N_String` Array(Nullable(String)),
`entries.buyerParticipantId_N_String` Array(Nullable(String)),
`entries.condition_N_String` Array(Nullable(String)),
`entries.matchId_N_String` Array(Nullable(String)),
`entries.sellerNumberOfOrders_N_Int64` Array(Nullable(Int64)),
`entries.sellerOrderId_N_String` Array(Nullable(String)),
`entries.sellerParticipantId_N_String` Array(Nullable(String)),
`entries.side_N_AggressorSide` Array(Nullable(Enum16('BUY' = 0, 'SELL' = 1))),
`entries.tradeType_N_TradeType` Array(
Nullable(
Enum16(
'REGULAR_TRADE' = 0,
'AUCTION_CLEARING_PRICE' = 1,
'CORRECTION' = 2,
'CANCELLATION' = 3,
'UNKNOWN' = 4
)
)
),
`entries.orderType_N_String` Array(Nullable(String)),
`entries.modelType_DataModelType` Array(
Enum16(
'LEVEL_ONE' = 0,
'LEVEL_TWO' = 1,
'LEVEL_THREE' = 2,
'MAX' = 3
)
),
`cause_N_String` Nullable(String),
`status_N_DataConnectorStatus` Nullable(
Enum16(
'INITIAL' = 0,
'CONNECTED_BY_USER' = 1,
'AUTOMATICALLY_RESTORED' = 2,
'DISCONNECTED_BY_USER' = 3,
'DISCONNECTED_BY_COMPLETED_BATCH' = 4,
'DISCONNECTED_BY_VENDOR_AND_RECONNECTING' = 5,
'DISCONNECTED_BY_VENDOR_AND_HALTED' = 6,
'DISCONNECTED_BY_ERROR_AND_RECONNECTING' = 7,
'DISCONNECTED_BY_ERROR_AND_HALTED' = 8,
'RECOVERING_BEGIN' = 9,
'LIVE_BEGIN' = 10
)
),
`exchangeId_N_String` Nullable(String),
`originalStatus_N_String` Nullable(String),
`status_N_SecurityStatus` Nullable(
Enum16(
'FEED_CONNECTED' = 0,
'FEED_DISCONNECTED' = 1,
'TRADING_STARTED' = 2,
'TRADING_STOPPED' = 3
)
)
) ENGINE = MergeTree() PARTITION BY partition
ORDER BY
(timestamp, instrument) SETTINGS index_granularity = 8192

This table can be displayed in the TimeBase integration with Tabix (business intelligence application and SQL editor for ClickHouse) as follows:

Failover Support

In case pollingIntervalMs is configured, the replicator will periodically check the list of stream to be replicated and start replication in case is has been stopped for any reason. If this parameter is not configured, the replication process can be restarted manually. In both cases the next flow applies:

  1. Locate the MAX timestamp - a suspected crush/break point.

    SELECT MAX(EventTime) AS EventTime FROM table_name
  2. Delete data with this timestamp.

    DELETE FROM table_name WHERE EventTime = max_time
  3. Continue replication from the MAX timestamp.

Deployment

  1. Run TimeBase

    # start TimeBase Community Edition
    docker run --rm -d \
    -p 8011:8011 \
    --name=timebase-server \
    --ulimit nofile=65536:65536 \
    finos/timebase-ce-server:latest
  2. Run replicator in Docker or directly via java -jar

info

Configuration

configuration example
# ClickHouse parameters
clickhouse:
url: jdbc:clickhouse://tbc-clickhouse:8123/default
username: read
password:
database: tbc

# TimeBase parameters
timebase:
url: dxtick://tbc-timebase:8011
username:
password:

# Replicator parameters
replication:
pollingIntervalMs: 60_000
flushMessageCount: 10_000
flushTimeoutMs: 60_000
streams: []

ClickHouse parameters:

  • url - ClickHouse host name.
  • database - ClickHouse database.
  • username - login username.
  • password - login password.

TimeBase parameters:

  • url - TimeBase host.
  • username - login username.
  • password - login password.

Replicator parameters:

  • streams - an array of TimeBase streams that will be replicated to ClickHouse. May include wildcards.
  • pollingIntervalMs - time interval in milliseconds to check for new streams to be replicated.
  • flushMessageCount - data will be replicated in batch upon the achievement of the defined message count.
  • flushTimeoutMs - in case flushMessageCount has not been reached data is replicated anyways after flushTimeoutMs time runs out.

Data Type Mappings

  • Refer to ClickHouse Data Types to learn more about data types supported by ClickHouse DBMS.
  • Refer to Data Types to learn more about data types supported by TimeBase.
  • Refer to SchemaProcessor to view data mappings for the ClickHouse replicator.
TimeBase TypeClickHouse Type
BOOLEANUInt8
CHAR/VARCHARString
TIMESTAMPDateTime64
ENUMEnum16
FLOAT/BINARY (32)Float32
FLOAT/BINARY (64)Float64
FLOAT/DECIMAL64Decimal128
INTEGER/SIGNED (8)Int8
INTEGER/SIGNED (16)Int16
INTEGER/SIGNED (32)Int32
INTEGER/SIGNED (64)Int 64
TIMEOFDAYInt32