Client Libraries
Supported client APIs:
tip
Contact your admin for access credentials to artifacts repositories.
Client language | Supported versions | Supported OS | Artifacts Repository |
---|---|---|---|
Java | 11-17 | macOS, Windows, Linux | Link |
Python | 3.8, 3.9, 3.10, 3.11, 3.12 | macOS, Windows, Linux | Link |
.NET | netstandard2.0 | macOS, Windows, Linux | Link |
C++ | - | macOS, Windows, Linux | Link |
You can install TimeBase Python client using this command
# install specific version
pip install dxapi==5.5.23 --extra-index-url https://<user>:<password>@nexus.deltixhub.com/repository/epm-rtc-public-python/simple
# install latest version
pip install dxapi --extra-index-url https://<user>:<password>@nexus.deltixhub.com/repository/epm-rtc-public-python/simple
# command for centos7 (supported for 5.4.18 version only)
pip install dxapi==5.4.18+centos --extra-index-url https://<user>:<password>@nexus.deltixhub.com/repository/epm-rtc-public-python/simple
Prerequisites
Before running samples, please make sure you have a TimeBase Server running on your local machine on port 8011.
tip
Refer to TickDB API and Deployment for more information.
Sample Message
- Java
- .NET
public class MyBarMessage extends InstrumentMessage {
public double closePrice;
public double openPrice;
public double highPrice;
public double lowPrice;
public double volume;
public String exchange;
@Override
public StringBuilder toString(StringBuilder sb) {
sb.append("{ \"$type\": \"MyBarMessage\"");
sb.append(", \"closePrice\": ").append(closePrice);
sb.append(", \"openPrice\": ").append(openPrice);
sb.append(", \"highPrice\": ").append(highPrice);
sb.append(", \"lowPrice\": ").append(lowPrice);
sb.append(", \"volume\": ").append(volume);
return sb;
}
}
public class MyBarMessage : InstrumentMessage
{
public double ClosePrice;
public double OpenPrice;
public double HighPrice;
public double LowPrice;
public double Volume;
public String Exchange;
//
// Summary:
// Appends an object state to a given StringBuilder in a form of JSON. A StringBuilder
// that was used to append the object to.
public override StringBuilder ToString(StringBuilder builder)
{
builder.Append("{ \"$type\": \"MyBarMessage\"");
builder.Append(", \"closePrice\": ").Append(ClosePrice);
builder.Append(", \"openPrice\": ").Append(OpenPrice);
builder.Append(", \"highPrice\": ").Append(HighPrice);
builder.Append(", \"lowPrice\": ").Append(LowPrice);
builder.Append(", \"volume\": ").Append(Volume);
return builder;
}
}
Create Stream
- Java
- Python
- C++
- .NET
public void createStream() {
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
String streamName = "mybars";
// Introspect MyBarMessage.class to define it's schema
Introspector introspector = Introspector.createEmptyMessageIntrospector();
RecordClassDescriptor descriptor = introspector.introspectRecordClass(MyBarMessage.class);
// Define stream Options to creating Durable (Persistent storage) stream with maximum distribution with given descriptor
StreamOptions options = StreamOptions.fixedType(StreamScope.DURABLE, streamName, "Bar Messages", 0, descriptor);
// Create stream in database.
// first check if stream already exists in database, and delete it
DXTickStream bars = db.getStream(streamName);
if (bars != null)
bars.delete();
// create stream
DXTickStream stream = db.createStream(streamName, options);
System.out.println(stream.getKey() + " successfully created using Introspector");
}
}
import dxapi
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = dxapi.TickDb.createFromUrl(timebase)
# Open in write mode
db.open(False)
# Create stream from generated schema
schema = db.generateSchema(['deltix.timebase.api.messages.BarMessage'])
options = dxapi.StreamOptions()
options.schema(schema)
stream = db.createStream('Bars', options)
# Create stream from manually created schema
marketMessageType = dxapi.TypeDef(
'deltix.timebase.api.messages.MarketMessage',
'Market Message',
[
dxapi.FieldDef('originalTimestamp', 'Original Timestamp', None, dxapi.DataTypeDef('TIMESTAMP'), False),
dxapi.FieldDef('currencyCode', 'Currency Code', None, dxapi.DataTypeDef('INTEGER', 'INT64'), True, 999),
dxapi.FieldDef('sequenceNumber', None, None, dxapi.DataTypeDef('INTEGER', 'INT64'), True, 0),
dxapi.FieldDef('sourceId', None, None, dxapi.DataTypeDef('VARCHAR', 'ALPHANUMERIC(10)'), False)
],
False, None, True
)
myBarsMessageType = dxapi.TypeDef(
'org.messages.MyBarMessage',
'My Bar Message',
[
dxapi.FieldDef('close', 'Close Price', None, dxapi.DataTypeDef('FLOAT', 'DECIMAL')),
dxapi.FieldDef('open', 'Open Price', None, dxapi.DataTypeDef('FLOAT', 'DECIMAL'), False, None, 'close'),
dxapi.FieldDef('high', 'High', None, dxapi.DataTypeDef('FLOAT', 'DECIMAL'), False, None, 'close'),
dxapi.FieldDef('low', 'Low', None, dxapi.DataTypeDef('FLOAT', 'DECIMAL'), False, None, 'close'),
dxapi.FieldDef('volume', 'Volume', None, dxapi.DataTypeDef('FLOAT', 'DECIMAL')),
dxapi.FieldDef('exchangeId', 'Exchange Code', None, dxapi.DataTypeDef('VARCHAR', 'ALPHANUMERIC(10)'), False)
],
False,
'deltix.timebase.api.messages.MarketMessage'
)
schema = dxapi.SchemaDef(
[myBarsMessageType],
[marketMessageType, myBarsMessageType]
)
options = dxapi.StreamOptions()
options.schema(schema)
stream = db.createStream('MyBars', options)
finally: # database connection should be closed anyway
if (db.isOpen()):
db.close()
print("Connection " + timebase + " closed.")
void create_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Prepare QQL create stream DDL
std::string qql = "CREATE DURABLE STREAM \"bars1min\" 'bars1min' ( \
CLASS \"deltix.timebase.api.messages.MarketMessage\" 'Market Message' ( \
STATIC \"originalTimestamp\" TIMESTAMP = NULL, \
STATIC \"currencyCode\" 'Currency Code' INTEGER = 999, \
STATIC \"sequenceNumber\" '' INTEGER = NULL, \
STATIC \"sourceId\" '' VARCHAR = NULL \
) NOT INSTANTIABLE; \
CLASS \"deltix.timebase.api.messages.BarMessage\" 'Bar Message' UNDER \"deltix.timebase.api.messages.MarketMessage\" ( \
\"exchangeId\" 'Exchange Code' VARCHAR, \
\"close\" 'Close' FLOAT DECIMAL, \
\"open\" 'Open' FLOAT DECIMAL RELATIVE TO \"close\", \
\"high\" 'High' FLOAT DECIMAL RELATIVE TO \"close\", \
\"low\" 'Low' FLOAT DECIMAL RELATIVE TO \"close\", \
\"volume\" 'Volume' FLOAT DECIMAL \
); \
) \
OPTIONS(FIXEDTYPE; PERIODICITY = '1I'; HIGHAVAILABILITY = TRUE) \
COMMENT 'bars1min'";
// Execute query
unique_ptr<TickCursor> cursor(db->executeQuery(qql, std::vector<DxApi::QueryParameter>()));
// Read result
InstrumentMessage header;
if (cursor->next(&header)) {
std::string details, errorType, text;
DataReader &reader = cursor->getReader();
reader.readUTF8(details);
reader.readUTF8(errorType);
int8_t errorLevel = reader.readEnum8();
reader.readUTF8(text);
std::cout << "Query result: " << errorType << " with text: " << text << std::endl;
}
// Close cursor and connection
cursor->close();
db->close();
}
public void CreateStream()
{
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
using (ITickDb db = TickDbFactory.CreateFromUrl(connection))
{
// It require to open connection using read-write mode.
db.Open(false);
String streamName = "mybars";
// Create stream in database.
// Delete it, if its already exists...
ITickStream bars = db.GetStream(streamName);
if (bars != null)
bars.Delete();
StreamOptions streamOptions = new StreamOptions(StreamScope.Durable, streamName, "Bar Messages", 0);
RecordClassDescriptor descriptor = Introspector.CreateMessageIntrospector().IntrospectRecordClass(null); // MyBarMessage.class);
streamOptions.SetFixedType(descriptor);
// create stream
ITickStream stream = db.CreateStream(streamName, streamOptions);
Console.WriteLine(stream.Key + " successfully created using Introspector");
}
}
Write
- Java
- Python
- C++
- .NET
public void writeStream() {
String connection = "dxtick://localhost:8011";
// 1. Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
// 2. Get already created stream from database
String streamName = "mybars";
DXTickStream stream = db.getStream(streamName);
//
// 3. Writing new messages to the stream
//
// 3.1 Create Loader
try (TickLoader loader = stream.createLoader()) {
// 3.2 Create message to send
MyBarMessage message = new MyBarMessage();
// 3.3 set time for this message
LocalDateTime localTime = LocalDateTime.of(2020, Month.MARCH, 9, 0, 0, 0);
Instant utc = localTime.atZone(ZoneId.of("UTC")).toInstant();
message.setTimeStampMs(utc.toEpochMilli());
// 3.4 define field values
message.setSymbol("AAPL"); // Apple
message.openPrice = 263.75;
message.highPrice = 278.09;
message.lowPrice = 263.00;
message.closePrice = 266.17;
message.volume = 71_690_000;
message.exchange = "NYSE";
// 3.5. send first message
System.out.println("Sending 1st message: " + message);
loader.send(message);
// 3.6. reuse message to send another
message.setSymbol("GOOG"); // Apple
message.openPrice = 1205.3;
message.highPrice = 1254.76;
message.lowPrice = 1200.00;
message.closePrice = 1215.56;
message.volume = 33_700_000;
// 3.7. send second message
System.out.println("Sending 2nd message: " + message);
loader.send(message);
}
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = dxapi.TickDb.createFromUrl(timebase)
# Open in read-write mode
db.open(False)
print('Connected to ' + timebase)
# Define name of the stream
streamKey = 'bars1min'
# Get stream from the timebase
stream = db.getStream(streamKey)
# Create a Message Loader for the selected stream and provide loading options
loader = stream.createLoader(dxapi.LoadingOptions())
# Create Bar message
barMessage = dxapi.InstrumentMessage()
# Define message type name according to the Timebase schema type name
# For the polymorphic streams, each message should have defined typeName to distinct messages on Timebase Server level.
barMessage.typeName = 'deltix.timebase.api.messages.BarMessage'
print('Start loading to ' + streamKey)
# get current time in UTC
now = datetime.utcnow() - datetime(1970, 1, 1)
# Define message timestamp as Epoch time in nanoseconds
ns = now.total_seconds() * 1e9 + now.microseconds * 1000;
barMessage.instrumentType = 'EQUITY'
barMessage.symbol = 'AAPL'
barMessage.timestamp = ns;
barMessage.exchangeId = 'NYSE'
barMessage.open = 263.75
barMessage.high = 278.09
barMessage.low = 263.00
barMessage.close = 266.17
barMessage.volume = 71_690_000
# Send message
loader.send(barMessage)
barMessage.symbol = 'GOOG'
barMessage.timestamp = ns;
barMessage.exchangeId = 'NYSE'
barMessage.open = 1205.3
barMessage.high = 1254.76
barMessage.low = 1200.00
barMessage.close = 1215.56
barMessage.volume = 33_700_000
# Send message
loader.send(barMessage)
# close Message Loader
loader.close()
loader = None
finally:
# database connection should be closed anyway
if db.isOpen():
db.close()
print("Connection " + timebase + " closed.")
void write_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Get stream
TickStream *stream = db->getStream("bars1min");
// Create loader
unique_ptr<TickLoader> loader(stream->createLoader(LoadingOptions()));
DataWriter &writer = loader->getWriter();
unsigned __int64 timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
// send first message
auto instrument = loader->getInstrumentId(InstrumentIdentity(InstrumentType::EQUITY, "AAPL"));
loader->beginMessage(0, instrument, timestamp);
writer.writeUTF8(std::string("NYSE")); // exchange id
writer.writeUInt64(deltix::dfp::Decimal64(266.17f).toUnderlying()); // close
writer.writeUInt64(deltix::dfp::Decimal64(263.75f).toUnderlying()); // open
writer.writeUInt64(deltix::dfp::Decimal64(278.09f).toUnderlying()); // high
writer.writeUInt64(deltix::dfp::Decimal64(263.00f).toUnderlying()); // low
writer.writeUInt64(deltix::dfp::Decimal64(71690000.0f).toUnderlying()); // volume
loader->send();
// send second message
instrument = loader->getInstrumentId(InstrumentIdentity(InstrumentType::EQUITY, "GOOG"));
loader->beginMessage(0, instrument, timestamp);
writer.writeUTF8(std::string("NYSE")); // exchange id
writer.writeUInt64(deltix::dfp::Decimal64(1215.56f).toUnderlying()); // close
writer.writeUInt64(deltix::dfp::Decimal64(1205.3f).toUnderlying()); // open
writer.writeUInt64(deltix::dfp::Decimal64(1254.76f).toUnderlying()); // high
writer.writeUInt64(deltix::dfp::Decimal64(1200.00f).toUnderlying()); // low
writer.writeUInt64(deltix::dfp::Decimal64(33700000.0f).toUnderlying()); // volume
loader->send();
// Close loader and connection
loader->close();
db->close();
}
public void WriteStream()
{
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
using (ITickDb db = TickDbFactory.CreateFromUrl(connection))
{
// It require to open connection using read-write mode.
db.Open(false);
// 2. Get already created stream from database
string streamName = "mybars";
ITickStream stream = db.GetStream(streamName);
// 3.1 Create Loader
using (ITickLoader loader = stream.CreateLoader())
{
// 3.2 Create message to send
MyBarMessage message = new MyBarMessage();
// 3.3 set time for this message
message.Timestamp = new HdDateTime(new DateTime(2020, 3, 9, 0, 0, 0));
// 3.4 define field values
message.Symbol = "AAPL"; // Apple
message.OpenPrice = 263.75;
message.HighPrice = 278.09;
message.LowPrice = 263.00;
message.ClosePrice = 266.17;
message.Volume = 71_690_000;
message.Exchange = "NYSE";
// 3.5. send first message
Console.WriteLine("Sending 1st message: " + message);
loader.Send(message);
// 3.6. reuse message to send another
message.Symbol = "GOOG"; // Apple
message.OpenPrice = 1205.3;
message.HighPrice = 1254.76;
message.LowPrice = 1200.00;
message.ClosePrice = 1215.56;
message.Volume = 33_700_000;
// 3.7. send second message
Console.WriteLine("Sending 2nd message: " + message);
loader.Send(message);
}
}
}
Read
- Java
- Python
- C++
- .NET
public void readStream() {
String connection = "dxtick://localhost:8011";
// 1. Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
// 2. Get already created stream from database
String streamName = "mybars";
DXTickStream stream = db.getStream(streamName);
// 3.1. Define List of entities to subscribe (if null, all stream entities will be used)
// Use GOOG
String[] entities = new String[] { "GOOG" };
// 3.2. Define list of types to subscribe - select only "MyBarMessage" messages
String typeName = MyBarMessage.class.getName();
String[] types = new String[] { typeName };
// 3.3. Create a new 'cursor' to read messages
try (TickCursor cursor = stream.select(Long.MIN_VALUE, new SelectionOptions(), types, entities)) {
// 3.4 Iterate cursor and get messages
while (cursor.next()) {
MyBarMessage message = (MyBarMessage) cursor.getMessage();
System.out.println("Read message: " + message.toString());
}
}
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = dxapi.TickDb.createFromUrl(timebase)
# Open in read-only mode
db.open(True)
print('Connected to ' + timebase)
# Define name of the stream
streamKey = 'ticks'
# Get stream from the timebase
stream = db.getStream(streamKey)
# Create cursor with empty subscription
cursor = db.createCursor(None, dxapi.SelectionOptions())
# Create subscription dynamically
# 1. Add entities
entities = [
dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAA'),
dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAPL')
]
cursor.addEntities(entities)
# 2. Subscribe to the Trade and BestBidOffer Messages
types = [
'deltix.timebase.api.messages.TradeMessage',
'deltix.timebase.api.messages.BestBidOfferMessage'
]
cursor.addTypes(types)
# 3. Subscribe to the data stream(s)
cursor.addStreams([stream])
# Define subscription start time
time = datetime(2010, 1, 1, 0, 0)
# Start time is Epoch time in milliseconds
startTime = calendar.timegm(time.timetuple()) * 1000
# 4. Reset cursor to the subscription time
cursor.reset(startTime)
try:
while cursor.next():
message = cursor.getMessage()
# Message time is Epoch time in nanoseconds
time = message.timestamp/1e9
messageTime = datetime.utcfromtimestamp(time)
if message.typeName == 'deltix.timebase.api.messages.TradeMessage':
print("Trade (" + str(messageTime) + "," + message.symbol + ") price: " + str(message.price))
elif message.typeName == 'deltix.timebase.api.messages.BestBidOfferMessage':
print("BBO (" + str(messageTime) + "," + message.symbol + ") bid price: " + str(message.bidPrice) + ", ask price: " + str(message.offerPrice))
finally:
# cursor should be closed anyway
cursor.close()
cursor = None
finally:
# database connection should be closed anyway
if db.isOpen():
db.close()
print("Connection " + timebase + " closed.")
void read_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Get stream
TickStream *stream = db->getStream("bars1min");
// Specify list of identities to filter
std::vector<InstrumentIdentity> identities = { InstrumentIdentity(InstrumentType::EQUITY, "AAPL") };
// Specify list of types to filter
std::vector<std::string> types = { "deltix.timebase.api.messages.BarMessage" };
// Create cursor
unique_ptr<TickCursor> cursor(stream->select(0, SelectionOptions(), &types, &identities));
InstrumentMessage header;
while (cursor->next(&header)) {
DataReader &reader = cursor->getReader();
int64_t timestamp = header.timestamp;
std::string symbol, exchange;
double open, close, high, low, volume;
symbol = *cursor->getSymbolName(header.entityId);
reader.readUTF8(exchange);
close = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
open = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
high = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
low = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
volume = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
std::cout << "Read message: " <<
" symbol: " << symbol <<
" exchange: " << exchange <<
", open: " << open << ", close: " << close <<
", high: " << high << ", low: " << low <<
", volume: " << volume <<
std::endl;
}
// Close cursor and connection
cursor->close();
db->close();
}
public void ReadStream()
{
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
using (ITickDb db = TickDbFactory.CreateFromUrl(connection))
{
// It require to open connection using read-write mode.
db.Open(false);
// 2. Get already created stream from database
String streamName = "mybars";
ITickStream stream = db.GetStream(streamName);
// 3.1. Define List of entities to subscribe (if null, all stream entities will be used)
// Use GOOG
String[] entities = new String[] { "GOOG" };
IInstrumentIdentity[] ids = new IInstrumentIdentity[]
{
new ConstantInstrumentKey(InstrumentType.Equity, "GOOG")
};
// 3.2. Define list of types to subscribe - select only "MyBarMessage" messages
String[] types = new String[] { typeof(MyBarMessage).Name };
// 3.3. Create a new 'cursor' to read messages
using (ITickCursor cursor = stream.Select(DateTime.MinValue, new SelectionOptions(), types, ids))
{
// 3.4 Iterate cursor and get messages
while (cursor.Next())
{
MyBarMessage message = (MyBarMessage) cursor.GetMessage();
Console.WriteLine("Read message: " + message.ToString());
}
}
}
}