Client Libraries
Supported client APIs:
Client language | Supported versions | Supported OS | Project GitHub Repository | Artifacts Repository |
---|---|---|---|---|
Java | 11-14 | macOS, Windows, Linux | Link | Link |
Python | 3.6, 3.7, 3.8, 3.9, 3.10 | macOS, Windows, Linux | Link | Link |
C++ | - | macOS, Windows, Linux | Link | Link |
Download whl file from the Python artifacts repository/assets and run this command to install a Python client.
pip install timebase_client-6.0.1-py2.py3-none-any.whl
info
Please refer to TimeBase Tutorial for more information.
Prerequisites
Before running samples, please make sure you have a TimeBase Server running on your local machine on port 8011.
tip
Refer to TickDB API and Deployment for more information.
Sample Message
- Java
public class MyBarMessage extends InstrumentMessage {
public double closePrice;
public double openPrice;
public double highPrice;
public double lowPrice;
public double volume;
public String exchange;
@Override
public StringBuilder toString(StringBuilder sb) {
sb.append("{ \"$type\": \"MyBarMessage\"");
sb.append(", \"closePrice\": ").append(closePrice);
sb.append(", \"openPrice\": ").append(openPrice);
sb.append(", \"highPrice\": ").append(highPrice);
sb.append(", \"lowPrice\": ").append(lowPrice);
sb.append(", \"volume\": ").append(volume);
return sb;
}
}
Create Stream
- Java
- Python
- C++
public void createStream() {
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
String streamName = "mybars";
// Introspect MyBarMessage.class to define it's schema
Introspector introspector = Introspector.createEmptyMessageIntrospector();
RecordClassDescriptor descriptor = introspector.introspectRecordClass(MyBarMessage.class);
// Define stream Options to creating Durable (Persistent storage) stream with maximum distribution with given descriptor
StreamOptions options = StreamOptions.fixedType(StreamScope.DURABLE, streamName, "Bar Messages", 0, descriptor);
// Create stream in database.
// first check if stream already exists in database, and delete it
DXTickStream bars = db.getStream(streamName);
if (bars != null)
bars.delete();
// create stream
DXTickStream stream = db.createStream(streamName, options);
System.out.println(stream.getKey() + " successfully created using Introspector");
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = tbapi.TickDb.createFromUrl(timebase)
# Open in write mode
db.open(False)
# QQL create stream DDL
barsQQL = """
CREATE DURABLE STREAM "bars1min" 'bars1min' (
CLASS "deltix.timebase.api.messages.MarketMessage" 'Market Message' (
STATIC "originalTimestamp" TIMESTAMP = NULL,
STATIC "currencyCode" 'Currency Code' INTEGER = 999,
STATIC "sequenceNumber" '' INTEGER = NULL,
STATIC "sourceId" '' VARCHAR = NULL
) NOT INSTANTIABLE;
CLASS "deltix.timebase.api.messages.BarMessage" 'Bar Message' UNDER "deltix.timebase.api.messages.MarketMessage" (
"exchangeId" 'Exchange Code' VARCHAR,
"close" 'Close' FLOAT DECIMAL64,
"open" 'Open' FLOAT DECIMAL64 RELATIVE TO "close",
"high" 'High' FLOAT DECIMAL64 RELATIVE TO "close",
"low" 'Low' FLOAT DECIMAL64 RELATIVE TO "close",
"volume" 'Volume' FLOAT DECIMAL64
);
)
OPTIONS (FIXEDTYPE; PERIODICITY = '1I'; HIGHAVAILABILITY = TRUE)
COMMENT 'bars1min'
"""
# execute QQL and check result
cursor = db.executeQuery(barsQQL)
try:
if (cursor.next()):
message = cursor.getMessage()
print('Query result: ' + message.messageText)
finally:
if (cursor != None):
cursor.close()
finally: # database connection should be closed anyway
if (db.isOpen()):
db.close()
print("Connection " + timebase + " closed.")
void create_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Prepare QQL create stream DDL
std::string qql = "CREATE DURABLE STREAM \"bars1min\" 'bars1min' ( \
CLASS \"deltix.timebase.api.messages.MarketMessage\" 'Market Message' ( \
STATIC \"originalTimestamp\" TIMESTAMP = NULL, \
STATIC \"currencyCode\" 'Currency Code' INTEGER = 999, \
STATIC \"sequenceNumber\" '' INTEGER = NULL, \
STATIC \"sourceId\" '' VARCHAR = NULL \
) NOT INSTANTIABLE; \
CLASS \"deltix.timebase.api.messages.BarMessage\" 'Bar Message' UNDER \"deltix.timebase.api.messages.MarketMessage\" ( \
\"exchangeId\" 'Exchange Code' VARCHAR, \
\"close\" 'Close' FLOAT DECIMAL, \
\"open\" 'Open' FLOAT DECIMAL RELATIVE TO \"close\", \
\"high\" 'High' FLOAT DECIMAL RELATIVE TO \"close\", \
\"low\" 'Low' FLOAT DECIMAL RELATIVE TO \"close\", \
\"volume\" 'Volume' FLOAT DECIMAL \
); \
) \
OPTIONS(FIXEDTYPE; PERIODICITY = '1I'; HIGHAVAILABILITY = TRUE) \
COMMENT 'bars1min'";
// Execute query
unique_ptr<TickCursor> cursor(db->executeQuery(qql, std::vector<DxApi::QueryParameter>()));
// Read result
InstrumentMessage header;
if (cursor->next(&header)) {
std::string details, errorType, text;
DataReader &reader = cursor->getReader();
reader.readUTF8(details);
reader.readUTF8(errorType);
int8_t errorLevel = reader.readEnum8();
reader.readUTF8(text);
std::cout << "Query result: " << errorType << " with text: " << text << std::endl;
}
// Close cursor and connection
cursor->close();
db->close();
}
info
Please refer to TimeBase Tutorial for more information.
Write
- Java
- Python
- C++
public void writeStream() {
String connection = "dxtick://localhost:8011";
// 1. Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
// 2. Get already created stream from database
String streamName = "mybars";
DXTickStream stream = db.getStream(streamName);
//
// 3. Writing new messages to the stream
//
// 3.1 Create Loader
try (TickLoader loader = stream.createLoader()) {
// 3.2 Create message to send
MyBarMessage message = new MyBarMessage();
// 3.3 set time for this message
LocalDateTime localTime = LocalDateTime.of(2020, Month.MARCH, 9, 0, 0, 0);
Instant utc = localTime.atZone(ZoneId.of("UTC")).toInstant();
message.setTimeStampMs(utc.toEpochMilli());
// 3.4 define field values
message.setSymbol("AAPL"); // Apple
message.openPrice = 263.75;
message.highPrice = 278.09;
message.lowPrice = 263.00;
message.closePrice = 266.17;
message.volume = 71_690_000;
message.exchange = "NYSE";
// 3.5. send first message
System.out.println("Sending 1st message: " + message);
loader.send(message);
// 3.6. reuse message to send another
message.setSymbol("GOOG"); // Apple
message.openPrice = 1205.3;
message.highPrice = 1254.76;
message.lowPrice = 1200.00;
message.closePrice = 1215.56;
message.volume = 33_700_000;
// 3.7. send second message
System.out.println("Sending 2nd message: " + message);
loader.send(message);
}
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = tbapi.TickDb.createFromUrl(timebase)
# Open in read-write mode
db.open(False)
print('Connected to ' + timebase)
# Define name of the stream
streamKey = 'tradeBBO'
# Get stream from the timebase
stream = db.getStream(streamKey)
# Create a Message Loader for the selected stream and provide loading options
loader = stream.createLoader(tbapi.LoadingOptions())
# Create BestBidOffer message
bboMessage = tbapi.InstrumentMessage()
# Define message type name according to the Timebase schema type name
# For the polymorphic streams, each message should have defined typeName to distinct messages on Timebase Server level.
bboMessage.typeName = 'deltix.timebase.api.messages.BestBidOfferMessage'
# Create Trade Message message
tradeMessage = tbapi.InstrumentMessage()
# Define message type according to the Timebase schema type name
# For the polymorphic streams, each message should have defined typeName to distinct messages on Timebase Server level.
tradeMessage.typeName = 'deltix.timebase.api.messages.TradeMessage'
print('Start loading to ' + streamKey)
for i in range(100):
# get current time in UTC
now = datetime.utcnow() - datetime(1970, 1, 1)
# Define message timestamp as Epoch time in nanoseconds
ns = now.total_seconds() * 1e9 + now.microseconds * 1000
if (i % 2 == 0):
# Define instrument type and symbol for the message
tradeMessage.symbol = 'AAA'
tradeMessage.timestamp = ns
# Define other message properties
tradeMessage.originalTimestamp = 0
# 'undefined' currency code
tradeMessage.currencyCode = 999
tradeMessage.exchangeId = 'NYSE'
tradeMessage.price = 10.0 + i * 2.2
tradeMessage.size = 20.0 + i * 3.3
tradeMessage.aggressorSide = 'BUY'
tradeMessage.netPriceChange = 30.0 + i * 4.4
tradeMessage.eventType = 'TRADE'
# Send message
loader.send(tradeMessage)
else:
bboMessage.symbol = 'USGG5YR'
bboMessage.timestamp = ns;
bboMessage.originalTimestamp = 0
bboMessage.currencyCode = 999
bboMessage.sequenceNumber = 0
bboMessage.bidPrice = 126.0 + i * 2.2
bboMessage.bidSize = 127.0 + i * 3.3
bboMessage.bidExchangeId = 'NYSE'
bboMessage.offerPrice = 128.0 + i * 4.4
bboMessage.offerSize = 129.0 + i * 5.5
bboMessage.offerExchangeId = 'NYSE'
# Send message
loader.send(bboMessage)
# close Message Loader
loader.close()
loader = None
finally:
# database connection should be closed anyway
if db.isOpen():
db.close()
print("Connection " + timebase + " closed.")
void write_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Get stream
TickStream *stream = db->getStream("bars1min");
// Create loader
unique_ptr<TickLoader> loader(stream->createLoader(LoadingOptions()));
DataWriter &writer = loader->getWriter();
unsigned __int64 timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
// send first message
auto instrument = loader->getInstrumentId("AAPL");
loader->beginMessage(0, instrument, timestamp);
writer.writeUTF8(std::string("NYSE")); // exchange id
writer.writeUInt64(deltix::dfp::Decimal64(266.17f).toUnderlying()); // close
writer.writeUInt64(deltix::dfp::Decimal64(263.75f).toUnderlying()); // open
writer.writeUInt64(deltix::dfp::Decimal64(278.09f).toUnderlying()); // high
writer.writeUInt64(deltix::dfp::Decimal64(263.00f).toUnderlying()); // low
writer.writeUInt64(deltix::dfp::Decimal64(71690000.0f).toUnderlying()); // volume
loader->send();
// send second message
instrument = loader->getInstrumentId("GOOG");
loader->beginMessage(0, instrument, timestamp);
writer.writeUTF8(std::string("NYSE")); // exchange id
writer.writeUInt64(deltix::dfp::Decimal64(1215.56f).toUnderlying()); // close
writer.writeUInt64(deltix::dfp::Decimal64(1205.3f).toUnderlying()); // open
writer.writeUInt64(deltix::dfp::Decimal64(1254.76f).toUnderlying()); // high
writer.writeUInt64(deltix::dfp::Decimal64(1200.00f).toUnderlying()); // low
writer.writeUInt64(deltix::dfp::Decimal64(33700000.0f).toUnderlying()); // volume
loader->send();
// Close loader and connection
loader->close();
db->close();
}
info
Please refer to TimeBase Tutorial for more information.
Read
- Java
- Python
- C++
public void readStream() {
String connection = "dxtick://localhost:8011";
// 1. Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
// 2. Get already created stream from database
String streamName = "mybars";
DXTickStream stream = db.getStream(streamName);
// 3.1. Define List of entities to subscribe (if null, all stream entities will be used)
// Use GOOG
String[] entities = new String[] { "GOOG" };
// 3.2. Define list of types to subscribe - select only "MyBarMessage" messages
String typeName = MyBarMessage.class.getName();
String[] types = new String[] { typeName };
// 3.3. Create a new 'cursor' to read messages
try (TickCursor cursor = stream.select(Long.MIN_VALUE, new SelectionOptions(), types, entities)) {
// 3.4 Iterate cursor and get messages
while (cursor.next()) {
MyBarMessage message = (MyBarMessage) cursor.getMessage();
System.out.println("Read message: " + message.toString());
}
}
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = tbapi.TickDb.createFromUrl(timebase)
# Open in read-only mode
db.open(True)
print('Connected to ' + timebase)
# Define name of the stream
streamKey = 'ticks'
# Get stream from the timebase
stream = db.getStream(streamKey)
# Create cursor with empty subscription
cursor = db.createCursor(None, tbapi.SelectionOptions())
# Create subscription dynamically
# 1. Add entities
entities = ['AAA', 'AAPL']
cursor.addEntities(entities)
# 2. Subscribe to the Trade and BestBidOffer Messages
types = [
'deltix.timebase.api.messages.TradeMessage',
'deltix.timebase.api.messages.BestBidOfferMessage'
]
cursor.addTypes(types)
# 3. Subscribe to the data stream(s)
cursor.addStreams([stream])
# Define subscription start time
time = datetime(2010, 1, 1, 0, 0)
# Start time is Epoch time in milliseconds
startTime = calendar.timegm(time.timetuple()) * 1000
# 4. Reset cursor to the subscription time
cursor.reset(startTime)
try:
while cursor.next():
message = cursor.getMessage()
# Message time is Epoch time in nanoseconds
time = message.timestamp/1e9
messageTime = datetime.utcfromtimestamp(time)
if message.typeName == 'deltix.timebase.api.messages.TradeMessage':
print("Trade (" + str(messageTime) + "," + message.symbol + ") price: " + str(message.price))
elif message.typeName == 'deltix.timebase.api.messages.BestBidOfferMessage':
print("BBO (" + str(messageTime) + "," + message.symbol + ") bid price: " + str(message.bidPrice) + ", ask price: " + str(message.offerPrice))
finally:
# cursor should be closed anyway
cursor.close()
cursor = None
finally:
# database connection should be closed anyway
if db.isOpen():
db.close()
print("Connection " + timebase + " closed.")
void read_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Get stream
TickStream *stream = db->getStream("bars1min");
// Specify list of identities to filter
std::vector<std::string> identities = { "AAPL" };
// Specify list of types to filter
std::vector<std::string> types = { "deltix.timebase.api.messages.BarMessage" };
// Create cursor
unique_ptr<TickCursor> cursor(stream->select(0, SelectionOptions(), &types, &identities));
InstrumentMessage header;
while (cursor->next(&header)) {
DataReader &reader = cursor->getReader();
int64_t timestamp = header.timestamp;
std::string symbol, exchange;
double open, close, high, low, volume;
symbol = *cursor->getInstrument(header.entityId);
reader.readUTF8(exchange);
close = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
open = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
high = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
low = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
volume = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
std::cout << "Read message: " <<
" symbol: " << symbol <<
" exchange: " << exchange <<
", open: " << open << ", close: " << close <<
", high: " << high << ", low: " << low <<
", volume: " << volume <<
std::endl;
}
// Close cursor and connection
cursor->close();
db->close();
}
info
Please refer to TimeBase Tutorial for more information.